#!/usr/bin/env /usr/openv/pdde/pdshared/bin/python3 # $Copyright: Copyright (c) 2022 Veritas Technologies LLC. All rights reserved $ from __future__ import absolute_import from system_utils import ssl ssl.enable_fips_140_2() import os import sys import boto3 import json import argparse from argparse import RawTextHelpFormatter from datetime import date, datetime EC_OK = 0 EC_ERROR = 1 EC_BAD_USAGE = 2 def _read_uri_content(uri): FILE_URI = 'file://' if not uri.startswith(FILE_URI): raise NotImplementedError('Only support URI starts with \'{0}\'.'.format(FILE_URI)) with open(uri[len(FILE_URI):]) as fp: content = fp.read() #print('uri content: ', content) return content def _read_uri_content_to_json(uri): json_content = json.loads(_read_uri_content(uri)) return json_content def get_boto3_version(): return boto3.__version__ def json_serial(obj): """JSON serializer for objects not serializable by default json code""" if isinstance(obj, (datetime, date)): return obj.isoformat() raise TypeError('Object of type {0} not JSON serializable'.format(type(obj))) def _generate_response(response): #print('Raw response: \n', response) # Delete ResponseMetadata key in response response.pop('ResponseMetadata', None) if not response: return '' return json.dumps(response, indent=4, default=json_serial) def iam_get_role(role_name): iam_client = boto3.client('iam') response = iam_client.get_role(RoleName=role_name) return _generate_response(response) def iam_create_role(role_name, role_policy_document_uri): iam_client = boto3.client('iam') response = iam_client.create_role(RoleName=role_name, AssumeRolePolicyDocument=_read_uri_content(role_policy_document_uri)) return _generate_response(response) def iam_delete_role(role_name): iam_client = boto3.client('iam') response = iam_client.delete_role(RoleName=role_name) return _generate_response(response) def iam_list_role_policies(role_name): iam_client = boto3.client('iam') ret = {} ret['PolicyNames'] = [] response = iam_client.list_role_policies(RoleName=role_name) ret['PolicyNames'].extend(response['PolicyNames']) while response['IsTruncated']: response = iam_client.list_role_policies(RoleName=role_name, Marker=response['Marker']) ret['PolicyNames'].extend(response['PolicyNames']) return _generate_response(ret) def iam_put_role_policy(role_name, policy_name, policy_document_uri): iam_client = boto3.client('iam') response = iam_client.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=_read_uri_content(policy_document_uri)) return _generate_response(response) def iam_get_role_policy(role_name, policy_name): iam_client = boto3.client('iam') response = iam_client.get_role_policy(RoleName=role_name, PolicyName=policy_name) return _generate_response(response) def iam_delete_role_policy(role_name, policy_name): iam_client = boto3.client('iam') response = iam_client.delete_role_policy(RoleName=role_name, PolicyName=policy_name) return _generate_response(response) def ec2_import_image(role_name, description, disk_containers_uri, dry_run): ec2_client = boto3.client('ec2') if dry_run: response = ec2_client.import_image(DryRun=True) else: response = ec2_client.import_image(RoleName=role_name, Description=description, DiskContainers=_read_uri_content_to_json(disk_containers_uri)) return _generate_response(response) def ec2_describe_import_image_tasks(import_task_ids): ec2_client = boto3.client('ec2') response = ec2_client.describe_import_image_tasks(ImportTaskIds=import_task_ids) return _generate_response(response) def ec2_create_tags(resources, tags): ec2_resource = boto3.resource('ec2') response = ec2_resource.create_tags(Resources=resources, Tags=tags) return _generate_response(response) def ec2_describe_images(owners, filters): ec2_client = boto3.client('ec2') if filters is None: response = ec2_client.describe_images(Owners=owners) else: response = ec2_client.describe_images(Owners=owners, Filters=filters) return _generate_response(response) def s3api_create_bucket(bucket, region, create_bucket_configuration): if region: s3_client = boto3.client('s3', region_name=region) else: s3_client = boto3.client('s3') if create_bucket_configuration is None: response = s3_client.create_bucket(Bucket=bucket) else: response = s3_client.create_bucket(Bucket=bucket, CreateBucketConfiguration=create_bucket_configuration) return _generate_response(response) def s3api_get_bucket_location(bucket): s3_client = boto3.client('s3') response = s3_client.get_bucket_location(Bucket=bucket) return _generate_response(response) def s3api_delete_object(bucket, key): s3_client = boto3.client('s3') response = s3_client.delete_object(Bucket=bucket, Key=key) return _generate_response(response) def s3_stream_upload_object(bucket, key): s3_client = boto3.client('s3') s3_client.upload_fileobj(sys.stdin.buffer, bucket, key) return 'Object {0} uploaded to bucket {1} successfully.'.format(key, bucket) def _eprint(*args, **kwargs): # The first '\n' is to mimic awscli error output print('\n', end='') print(*args, file=sys.stderr, **kwargs) def _iam_create_role_handler(args): if not args.role_name or not args.assume_role_policy_document: print('usage: {0} iam create-role --role-name --assume-role-policy-document file://'.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_create_role(args.role_name, args.assume_role_policy_document) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_delete_role_handler(args): if not args.role_name: print('usage: {0} iam delete-role --role-name '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_delete_role(args.role_name) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_get_role_handler(args): if not args.role_name: print('usage: {0} iam get-role --role-name '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_get_role(args.role_name) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_put_role_policy_handler(args): if not args.role_name or not args.policy_name or not args.policy_document: print('usage: {0} iam put-role-policy --role-name --policy-name --policy-document file://'.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_put_role_policy(args.role_name, args.policy_name, args.policy_document) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_list_role_policies_handler(args): if not args.role_name: print('usage: {0} iam list-role-policies --role-name '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_list_role_policies(args.role_name) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_get_role_policy_handler(args): if not args.role_name or not args.policy_name: print('usage: {0} iam get-role-policy --role-name --policy-name '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_get_role_policy(args.role_name, args.policy_name) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _iam_delete_role_policy_handler(args): if not args.role_name or not args.policy_name: print('usage: {0} iam delete-role-policy --role-name --policy-name '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = iam_delete_role_policy(args.role_name, args.policy_name) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _ec2_import_image_handler(args): if not args.dry_run and (not args.role_name or not args.description or not args.disk_containers): print('usage: {0} ec2 import-image [--dry-run] --role-name --description --disk-containers file://'.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = ec2_import_image(args.role_name, args.description, args.disk_containers, args.dry_run) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _ec2_describe_import_image_tasks_handler(args): if not args.import_task_ids: print('usage: {0} ec2 describe-import-image-tasks --import-task-ids '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = ec2_describe_import_image_tasks([args.import_task_ids]) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK # dict_keys example: ['Name', 'Value'] # dict_string example: 'Name=myname,Value=myvalue' def _parse_dict(dict_keys, dict_string): if not dict_string: return None dict_string_list = dict_string.split(',') if len(dict_string_list) < len(dict_keys): return None ret_dict = {} for i, cur_key in enumerate(dict_keys): if dict_string_list[i].startswith(cur_key + '='): value = dict_string_list[i][len(cur_key) + 1:] else: return None ret_dict[cur_key] = value return ret_dict def _parse_tag(tag_string): tag_keys = [] tag_keys.append('Key') tag_keys.append('Value') return _parse_dict(tag_keys, tag_string) def _parse_filter(filter_string): filter_keys = [] filter_keys.append('Name') filter_keys.append('Values') ret = _parse_dict(filter_keys, filter_string) ret['Values'] = [ret['Values']] return ret def _ec2_create_tags_handler(args): tag = _parse_tag(args.tags) if not args.resources or tag is None: print('usage: {0} ec2 create-tags --resources --tags Key=,Value='.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = ec2_create_tags([args.resources], [tag]) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _ec2_describe_images_handler(args): filter_dict = _parse_filter(args.filters) if not args.owners: print('usage: {0} ec2 describe-images --owners [--filters Name=,Values=]'.format(os.path.basename(__file__))) return EC_BAD_USAGE if filter_dict is None: filter_list = None else: filter_list = [filter_dict] try: ret = ec2_describe_images([args.owners], filter_list) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _s3api_create_bucket_handler(args): usage = 'usage: {0} s3api create-bucket --bucket [--region ] [--create-bucket-configuration LocationConstraint=]'.format(os.path.basename(__file__)) if not args.bucket: print(usage) return EC_BAD_USAGE create_bucket_configuration = None if args.create_bucket_configuration: create_bucket_configuration = _parse_dict(['LocationConstraint'], args.create_bucket_configuration) if create_bucket_configuration is None: print(usage) return EC_BAD_USAGE try: ret = s3api_create_bucket(args.bucket, args.region, create_bucket_configuration) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _s3api_get_bucket_location_handler(args): if not args.bucket: print('usage: {0} s3api get-bucket-location --bucket '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = s3api_get_bucket_location(args.bucket) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _s3api_delete_object_handler(args): if not args.bucket or not args.key: print('usage: {0} s3api delete-object --bucket --key '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = s3api_delete_object(args.bucket, args.key) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK def _s3_stream_upload_object_handler(args): if not args.bucket or not args.key: print('usage: {0} s3 stream-upload --bucket --key '.format(os.path.basename(__file__))) return EC_BAD_USAGE try: ret = s3_stream_upload_object(args.bucket, args.key) print(ret) except Exception as ex: _eprint(ex) return EC_ERROR return EC_OK # Service -> [{Opertaions -> operation_handler},] map # New operation string and handler function should be added here supported_services_operations = {} supported_services_operations['iam'] = [ {'create-role': _iam_create_role_handler}, {'delete-role': _iam_delete_role_handler}, {'get-role': _iam_get_role_handler}, {'put-role-policy': _iam_put_role_policy_handler}, {'list-role-policies': _iam_list_role_policies_handler}, {'get-role-policy': _iam_get_role_policy_handler}, {'delete-role-policy': _iam_delete_role_policy_handler}, ] supported_services_operations['ec2'] = [ {'import-image': _ec2_import_image_handler}, {'describe-import-image-tasks': _ec2_describe_import_image_tasks_handler}, {'create-tags': _ec2_create_tags_handler}, {'describe-images': _ec2_describe_images_handler}, ] supported_services_operations['s3api'] = [ {'create-bucket': _s3api_create_bucket_handler}, {'get-bucket-location': _s3api_get_bucket_location_handler}, {'delete-object': _s3api_delete_object_handler}, ] supported_services_operations['s3'] = [ {'stream-upload': _s3_stream_upload_object_handler}, ] # Generated from supported_services_operations by _get_operation_list() # Format: {'service_name1': ['operation1', 'operation2', ...], 'service_name2': ['operation1', 'operation2', ...], ...} _supported_services_operation_list = {} def _get_operation_list(service): # If already generated if service in _supported_services_operation_list: # print('found cache for ' + service) return _supported_services_operation_list[service] # print('generate list for ' + service) # Generate operation list operation_list = [] for operation_func in supported_services_operations[service]: operation_list.extend(list(operation_func.keys())) # Save to dict _supported_services_operation_list[service] = operation_list return operation_list def _convert_list_to_string(list): ret = '' i = 0 list_len = len(list) while i < list_len: if i == 0: ret += list[0] elif i != list_len - 1: ret += ', ' + list[i] else: ret += ' and ' + list[i] i += 1 return ret def _get_service_help_text(): return 'AWS service name, supports ' + _convert_list_to_string(list(supported_services_operations.keys())) + '.' def _get_operation_help_text(): help_text = 'AWS operation.\n' for service in supported_services_operations: help_text += 'For ' + service + ' service, it supports ' + _convert_list_to_string(_get_operation_list(service)) + '.\n' return help_text # Return True is service and operatio is valid, otherwise return False def _validate_service_and_operation(service, operation): if not service or not operation: return False if service not in supported_services_operations: return False if operation not in _get_operation_list(service): return False return True def _create_argparser(): parser = argparse.ArgumentParser(formatter_class=RawTextHelpFormatter) parser.add_argument('service', nargs='?', help=_get_service_help_text()) parser.add_argument('operation', nargs='?', help=_get_operation_help_text()) parser.add_argument('--role-name', help='AWS IAM role name') parser.add_argument('--policy-name', help='AWS IAM role policy name') parser.add_argument('--policy-document', help='AWS IAM role policy document') parser.add_argument('--assume-role-policy-document', help='AWS IAM role policy document') parser.add_argument('--dry-run', help='AWS EC2 import image dry run', action='store_true') parser.add_argument('--description', help='AWS EC2 import image description') parser.add_argument('--disk-containers', help='AWS EC2 import image disk containers') parser.add_argument('--import-task-ids', help='AWS EC2 import image task IDs') parser.add_argument('--resources', help='AWS EC2 resources') parser.add_argument('--tags', help='AWS EC2 tags') parser.add_argument('--owners', help='AWS EC2 describe image owners') parser.add_argument('--filters', help='AWS EC2 describe image filters') parser.add_argument('--bucket', help='AWS S3 bucket name') parser.add_argument('--key', help='AWS S3 object key') parser.add_argument('--create-bucket-configuration', help='AWS S3 create bucket configuration string') parser.add_argument('--region', help='AWS region constraint') parser.add_argument('--version', help='Boto3 SDK version', action='store_true') return parser def main(): parser = _create_argparser() args = parser.parse_args() # Show version if args.version: print(get_boto3_version()) return EC_OK # Validate service and operation if not _validate_service_and_operation(args.service, args.operation): parser.print_help() return EC_BAD_USAGE # Search for operation and coresponding handler for operation_func in supported_services_operations[args.service]: if args.operation in operation_func: return operation_func[args.operation](args) # Operation not found, should not happen (should be filtered by _validate_service_and_operation()) return EC_ERROR if __name__ == '__main__': sys.exit(main())