# -*- encoding: utf-8 -*-
import operator
from botocore.exceptions import ClientError
[docs]class EcsThrottleException(Exception):
pass
[docs]def identify_cluster_by_app_name(client, app_name):
"""
Given the name of one of our applications (e.g. api, calm_adapter),
return the ARN of the cluster the task runs on.
"""
for cluster_arn in get_cluster_arns(client):
for service_arn in get_service_arns(client, cluster_arn=cluster_arn):
# The format of an ECS service ARN is:
#
# arn:aws:ecs:{aws_region}:{account_id}:service/{service_name}
#
# Our ECS cluster is configured so that the name of the ECS cluster
# matches the name of the config in S3. It would be more robust
# to use the describeService API, but this saves us a couple of
# calls on our API quota so we skip it.
_, service_name = service_arn.split('/')
if service_name == app_name:
return cluster_arn
raise RuntimeError(f'Unable to find ECS cluster for {app_name}')
[docs]def get_latest_task_definition(client, cluster, service):
"""
Given the name of a cluster and a service, return the ARN
for its latest task definition.
"""
resp = client.describe_services(cluster=cluster, services=[service])
# The top-level structure of a describeServices API response is of the form
#
# {
# "failures": [],
# "services": [
# ...
# ]
# }
#
# Because we only asked for a description of a single service, we expect
# there to only be a single service.
services = resp['services']
assert len(services) == 1, resp
service = services[0]
# Within a 'service' description, the following structure is what we're
# interested in:
#
# "deployments": [
# {
# "createdAt": <date>,
# "taskDefinition": <task definition ARN>,
# "updatedAt": <date>
# ...
# },
# ... other running tasks
# ],
#
# Each "deployment" corresponds to a running task, so we pick the
# container with the most recent creation date.
deployments = service['deployments']
assert len(deployments) > 0, resp
newest_deployment = max(deployments, key=operator.itemgetter('createdAt'))
return newest_deployment['taskDefinition']
[docs]def clone_task_definition(client, task_definition):
"""
Given a task definition ARN, clone the associated task.
Returns the new task definition ARN.
"""
resp = client.describe_task_definition(taskDefinition=task_definition)
taskDefinition = resp['taskDefinition']
# The task definition contains two key fields: "family" and
# "containerDefinitions" which full describe the task.
new_task = client.register_task_definition(
family=taskDefinition['family'],
taskRoleArn=taskDefinition['taskRoleArn'],
containerDefinitions=taskDefinition['containerDefinitions'],
volumes=taskDefinition['volumes']
)
return new_task['taskDefinition']['taskDefinitionArn']
def _name_from_arn(arn):
return arn.split("/")[1]
def _check_for_throttle_exception(f, *args, **kwargs):
try:
return f(*args, **kwargs)
except ClientError as ex:
if ex.response['Error']['Code'] == 'ThrottlingException':
print(f'ThrottlingException: {ex}')
raise EcsThrottleException(ex)
else:
raise
[docs]def get_service_arns(ecs_client, cluster_arn):
"""
Given a cluster ARN, extracts the associated service ARNs.
Returns a list of service ARNS.
"""
return _check_for_throttle_exception(
ecs_client.list_services,
cluster=_name_from_arn(cluster_arn)
)['serviceArns']
[docs]def get_cluster_arns(ecs_client):
"""
Extract the list of cluster ARNs in this account.
Returns a list of cluster ARNs.
"""
return _check_for_throttle_exception(
ecs_client.list_clusters
)['clusterArns']
[docs]def describe_cluster(ecs_client, cluster_arn):
"""
Given a cluster ARN attempts to find a matching cluster description.
Returns a cluster description.
"""
return _check_for_throttle_exception(
ecs_client.describe_clusters,
clusters=[cluster_arn]
)['clusters'][0]
[docs]def describe_service(ecs_client, cluster_arn, service_arn):
"""
Given a cluster ARN and service ARN, attempts to find a matching
service description.
Returns a service description.
"""
return _check_for_throttle_exception(
ecs_client.describe_services,
cluster=_name_from_arn(cluster_arn),
services=[_name_from_arn(service_arn)]
)['services'][0]
[docs]def run_task(
ecs_client,
cluster_name,
task_definition,
started_by,
container_name="app",
command=[]):
"""
Run a given command against a named container in a task definition
on a particular cluster.
Returns the response from calling run_task
"""
return ecs_client.run_task(
cluster=cluster_name,
taskDefinition=task_definition,
overrides={
'containerOverrides': [
{
'name': container_name,
'command': command
},
]
},
count=1,
startedBy=started_by,
)