""" This module defines an OpenAPIOperation class, a Connexion operation specific for OpenAPI 3 specs. """ import logging import warnings from copy import copy, deepcopy from connexion.operations.abstract import AbstractOperation from ..decorators.uri_parsing import OpenAPIURIParser from ..http_facts import FORM_CONTENT_TYPES from ..utils import deep_get, deep_merge, is_null, is_nullable, make_type logger = logging.getLogger("connexion.operations.openapi3") class OpenAPIOperation(AbstractOperation): """ A single API operation on a path. """ def __init__(self, api, method, path, operation, resolver, path_parameters=None, app_security=None, components=None, validate_responses=False, strict_validation=False, randomize_endpoint=None, validator_map=None, pythonic_params=False, uri_parser_class=None, pass_context_arg_name=None): """ This class uses the OperationID identify the module and function that will handle the operation From Swagger Specification: **OperationID** A friendly name for the operation. The id MUST be unique among all operations described in the API. Tools and libraries MAY use the operation id to uniquely identify an operation. :param method: HTTP method :type method: str :param path: :type path: str :param operation: swagger operation object :type operation: dict :param resolver: Callable that maps operationID to a function :param path_parameters: Parameters defined in the path level :type path_parameters: list :param app_security: list of security rules the application uses by default :type app_security: list :param components: `Components Object `_ :type components: dict :param validate_responses: True enables validation. Validation errors generate HTTP 500 responses. :type validate_responses: bool :param strict_validation: True enables validation on invalid request parameters :type strict_validation: bool :param randomize_endpoint: number of random characters to append to operation name :type randomize_endpoint: integer :param validator_map: Custom validators for the types "parameter", "body" and "response". :type validator_map: dict :param pythonic_params: When True CamelCase parameters are converted to snake_case and an underscore is appended to any shadowed built-ins :type pythonic_params: bool :param uri_parser_class: class to use for uri parsing :type uri_parser_class: AbstractURIParser :param pass_context_arg_name: If not None will try to inject the request context to the function using this name. :type pass_context_arg_name: str|None """ self.components = components or {} def component_get(oas3_name): return self.components.get(oas3_name, {}) # operation overrides globals security_schemes = component_get('securitySchemes') app_security = operation.get('security', app_security) uri_parser_class = uri_parser_class or OpenAPIURIParser self._router_controller = operation.get('x-openapi-router-controller') super().__init__( api=api, method=method, path=path, operation=operation, resolver=resolver, app_security=app_security, security_schemes=security_schemes, validate_responses=validate_responses, strict_validation=strict_validation, randomize_endpoint=randomize_endpoint, validator_map=validator_map, pythonic_params=pythonic_params, uri_parser_class=uri_parser_class, pass_context_arg_name=pass_context_arg_name ) self._definitions_map = { 'components': { 'schemas': component_get('schemas'), 'examples': component_get('examples'), 'requestBodies': component_get('requestBodies'), 'parameters': component_get('parameters'), 'securitySchemes': component_get('securitySchemes'), 'responses': component_get('responses'), 'headers': component_get('headers'), } } self._request_body = operation.get('requestBody', {}) self._parameters = operation.get('parameters', []) if path_parameters: self._parameters += path_parameters self._responses = operation.get('responses', {}) # TODO figure out how to support multiple mimetypes # NOTE we currently just combine all of the possible mimetypes, # but we need to refactor to support mimetypes by response code response_content_types = [] for _, defn in self._responses.items(): response_content_types += defn.get('content', {}).keys() self._produces = response_content_types or ['application/json'] request_content = self._request_body.get('content', {}) self._consumes = list(request_content.keys()) or ['application/json'] logger.debug('consumes: %s' % self.consumes) logger.debug('produces: %s' % self.produces) @classmethod def from_spec(cls, spec, api, path, method, resolver, *args, **kwargs): return cls( api, method, path, spec.get_operation(path, method), resolver=resolver, path_parameters=spec.get_path_params(path), app_security=spec.security, components=spec.components, *args, **kwargs ) @property def request_body(self): return self._request_body @property def parameters(self): return self._parameters @property def consumes(self): return self._consumes @property def produces(self): return self._produces def with_definitions(self, schema): if self.components: schema['schema']['components'] = self.components return schema def response_schema(self, status_code=None, content_type=None): response_definition = self.response_definition( status_code, content_type ) content_definition = response_definition.get("content", response_definition) content_definition = content_definition.get(content_type, content_definition) if "schema" in content_definition: return self.with_definitions(content_definition).get("schema", {}) return {} def example_response(self, status_code=None, content_type=None): """ Returns example response from spec """ # simply use the first/lowest status code, this is probably 200 or 201 status_code = status_code or sorted(self._responses.keys())[0] content_type = content_type or self.get_mimetype() examples_path = [str(status_code), 'content', content_type, 'examples'] example_path = [str(status_code), 'content', content_type, 'example'] schema_example_path = [ str(status_code), 'content', content_type, 'schema', 'example' ] schema_path = [str(status_code), 'content', content_type, 'schema'] try: status_code = int(status_code) except ValueError: status_code = 200 try: # TODO also use example header? return ( list(deep_get(self._responses, examples_path).values())[0]['value'], status_code ) except (KeyError, IndexError): pass try: return (deep_get(self._responses, example_path), status_code) except KeyError: pass try: return (deep_get(self._responses, schema_example_path), status_code) except KeyError: pass try: return (self._nested_example(deep_get(self._responses, schema_path)), status_code) except KeyError: return (None, status_code) def _nested_example(self, schema): try: return schema["example"] except KeyError: pass try: # Recurse if schema is an object return {key: self._nested_example(value) for (key, value) in schema["properties"].items()} except KeyError: pass try: # Recurse if schema is an array return [self._nested_example(schema["items"])] except KeyError: raise def get_path_parameter_types(self): types = {} path_parameters = (p for p in self.parameters if p["in"] == "path") for path_defn in path_parameters: path_schema = path_defn["schema"] if path_schema.get('type') == 'string' and path_schema.get('format') == 'path': # path is special case for type 'string' path_type = 'path' else: path_type = path_schema.get('type') types[path_defn['name']] = path_type return types @property def body_schema(self): """ The body schema definition for this operation. """ return self.body_definition.get('schema', {}) @property def body_definition(self): """ The body complete definition for this operation. **There can be one "body" parameter at most.** :rtype: dict """ if self._request_body: if len(self.consumes) > 1: logger.warning( 'this operation accepts multiple content types, using %s', self.consumes[0]) res = self._request_body.get('content', {}).get(self.consumes[0], {}) return self.with_definitions(res) return {} def _get_body_argument(self, body, arguments, has_kwargs, sanitize): if len(arguments) <= 0 and not has_kwargs: return {} # get the deprecated name from the body-schema for legacy connexion compat x_body_name = sanitize(self.body_schema.get('x-body-name')) if x_body_name: warnings.warn('x-body-name within the requestBody schema will be deprecated in the ' 'next major version. It should be provided directly under ' 'the requestBody instead.', DeprecationWarning) # prefer the x-body-name as an extension of requestBody, fallback to deprecated schema name, default 'body' x_body_name = sanitize(self.request_body.get('x-body-name', x_body_name or 'body')) if self.consumes[0] in FORM_CONTENT_TYPES: result = self._get_body_argument_form(body) else: result = self._get_body_argument_json(body) if x_body_name in arguments or has_kwargs: return {x_body_name: result} return {} def _get_body_argument_json(self, body): # if the body came in null, and the schema says it can be null, we decide # to include no value for the body argument, rather than the default body if is_nullable(self.body_schema) and is_null(body): return None if body is None: default_body = self.body_schema.get('default', {}) return deepcopy(default_body) return body def _get_body_argument_form(self, body): # now determine the actual value for the body (whether it came in or is default) default_body = self.body_schema.get('default', {}) body_props = {k: {"schema": v} for k, v in self.body_schema.get("properties", {}).items()} # by OpenAPI specification `additionalProperties` defaults to `true` # see: https://github.com/OAI/OpenAPI-Specification/blame/3.0.2/versions/3.0.2.md#L2305 additional_props = self.body_schema.get("additionalProperties", True) body_arg = deepcopy(default_body) body_arg.update(body or {}) if body_props or additional_props: return self._get_typed_body_values(body_arg, body_props, additional_props) return {} def _get_typed_body_values(self, body_arg, body_props, additional_props): """ Return a copy of the provided body_arg dictionary whose values will have the appropriate types as defined in the provided schemas. :type body_arg: type dict :type body_props: dict :type additional_props: dict|bool :rtype: dict """ additional_props_defn = {"schema": additional_props} if isinstance(additional_props, dict) else None res = {} for key, value in body_arg.items(): try: prop_defn = body_props[key] res[key] = self._get_val_from_param(value, prop_defn) except KeyError: # pragma: no cover if not additional_props: logger.error(f"Body property '{key}' not defined in body schema") continue if additional_props_defn is not None: value = self._get_val_from_param(value, additional_props_defn) res[key] = value return res def _build_default_obj_recursive(self, _properties, res): """ takes disparate and nested default keys, and builds up a default object """ for key, prop in _properties.items(): if 'default' in prop and key not in res: res[key] = copy(prop['default']) elif prop.get('type') == 'object' and 'properties' in prop: res.setdefault(key, {}) res[key] = self._build_default_obj_recursive(prop['properties'], res[key]) return res def _get_default_obj(self, schema): try: return deepcopy(schema["default"]) except KeyError: _properties = schema.get("properties", {}) return self._build_default_obj_recursive(_properties, {}) def _get_query_defaults(self, query_defns): defaults = {} for k, v in query_defns.items(): try: if v["schema"]["type"] == "object": defaults[k] = self._get_default_obj(v["schema"]) else: defaults[k] = v["schema"]["default"] except KeyError: pass return defaults def _get_query_arguments(self, query, arguments, has_kwargs, sanitize): query_defns = {p["name"]: p for p in self.parameters if p["in"] == "query"} default_query_params = self._get_query_defaults(query_defns) query_arguments = deepcopy(default_query_params) query_arguments = deep_merge(query_arguments, query) return self._query_args_helper(query_defns, query_arguments, arguments, has_kwargs, sanitize) def _get_val_from_param(self, value, query_defn): query_schema = query_defn["schema"] if is_nullable(query_schema) and is_null(value): return None if query_schema["type"] == "array": return [make_type(part, query_schema["items"]["type"]) for part in value] else: return make_type(value, query_schema["type"])