| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407 |
- """
- This module defines an OpenAPIOperation class, a Connexion operation specific for OpenAPI 3 specs.
- """
- import logging
- import warnings
- from copy import copy, deepcopy
- from connexion.operations.abstract import AbstractOperation
- from ..decorators.uri_parsing import OpenAPIURIParser
- from ..http_facts import FORM_CONTENT_TYPES
- from ..utils import deep_get, deep_merge, is_null, is_nullable, make_type
- logger = logging.getLogger("connexion.operations.openapi3")
- class OpenAPIOperation(AbstractOperation):
- """
- A single API operation on a path.
- """
- def __init__(self, api, method, path, operation, resolver, path_parameters=None,
- app_security=None, components=None, validate_responses=False,
- strict_validation=False, randomize_endpoint=None, validator_map=None,
- pythonic_params=False, uri_parser_class=None, pass_context_arg_name=None):
- """
- This class uses the OperationID identify the module and function that will handle the operation
- From Swagger Specification:
- **OperationID**
- A friendly name for the operation. The id MUST be unique among all operations described in the API.
- Tools and libraries MAY use the operation id to uniquely identify an operation.
- :param method: HTTP method
- :type method: str
- :param path:
- :type path: str
- :param operation: swagger operation object
- :type operation: dict
- :param resolver: Callable that maps operationID to a function
- :param path_parameters: Parameters defined in the path level
- :type path_parameters: list
- :param app_security: list of security rules the application uses by default
- :type app_security: list
- :param components: `Components Object
- <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.1.md#componentsObject>`_
- :type components: dict
- :param validate_responses: True enables validation. Validation errors generate HTTP 500 responses.
- :type validate_responses: bool
- :param strict_validation: True enables validation on invalid request parameters
- :type strict_validation: bool
- :param randomize_endpoint: number of random characters to append to operation name
- :type randomize_endpoint: integer
- :param validator_map: Custom validators for the types "parameter", "body" and "response".
- :type validator_map: dict
- :param pythonic_params: When True CamelCase parameters are converted to snake_case and an underscore is appended
- to any shadowed built-ins
- :type pythonic_params: bool
- :param uri_parser_class: class to use for uri parsing
- :type uri_parser_class: AbstractURIParser
- :param pass_context_arg_name: If not None will try to inject the request context to the function using this
- name.
- :type pass_context_arg_name: str|None
- """
- self.components = components or {}
- def component_get(oas3_name):
- return self.components.get(oas3_name, {})
- # operation overrides globals
- security_schemes = component_get('securitySchemes')
- app_security = operation.get('security', app_security)
- uri_parser_class = uri_parser_class or OpenAPIURIParser
- self._router_controller = operation.get('x-openapi-router-controller')
- super().__init__(
- api=api,
- method=method,
- path=path,
- operation=operation,
- resolver=resolver,
- app_security=app_security,
- security_schemes=security_schemes,
- validate_responses=validate_responses,
- strict_validation=strict_validation,
- randomize_endpoint=randomize_endpoint,
- validator_map=validator_map,
- pythonic_params=pythonic_params,
- uri_parser_class=uri_parser_class,
- pass_context_arg_name=pass_context_arg_name
- )
- self._definitions_map = {
- 'components': {
- 'schemas': component_get('schemas'),
- 'examples': component_get('examples'),
- 'requestBodies': component_get('requestBodies'),
- 'parameters': component_get('parameters'),
- 'securitySchemes': component_get('securitySchemes'),
- 'responses': component_get('responses'),
- 'headers': component_get('headers'),
- }
- }
- self._request_body = operation.get('requestBody', {})
- self._parameters = operation.get('parameters', [])
- if path_parameters:
- self._parameters += path_parameters
- self._responses = operation.get('responses', {})
- # TODO figure out how to support multiple mimetypes
- # NOTE we currently just combine all of the possible mimetypes,
- # but we need to refactor to support mimetypes by response code
- response_content_types = []
- for _, defn in self._responses.items():
- response_content_types += defn.get('content', {}).keys()
- self._produces = response_content_types or ['application/json']
- request_content = self._request_body.get('content', {})
- self._consumes = list(request_content.keys()) or ['application/json']
- logger.debug('consumes: %s' % self.consumes)
- logger.debug('produces: %s' % self.produces)
- @classmethod
- def from_spec(cls, spec, api, path, method, resolver, *args, **kwargs):
- return cls(
- api,
- method,
- path,
- spec.get_operation(path, method),
- resolver=resolver,
- path_parameters=spec.get_path_params(path),
- app_security=spec.security,
- components=spec.components,
- *args,
- **kwargs
- )
- @property
- def request_body(self):
- return self._request_body
- @property
- def parameters(self):
- return self._parameters
- @property
- def consumes(self):
- return self._consumes
- @property
- def produces(self):
- return self._produces
- def with_definitions(self, schema):
- if self.components:
- schema['schema']['components'] = self.components
- return schema
- def response_schema(self, status_code=None, content_type=None):
- response_definition = self.response_definition(
- status_code, content_type
- )
- content_definition = response_definition.get("content", response_definition)
- content_definition = content_definition.get(content_type, content_definition)
- if "schema" in content_definition:
- return self.with_definitions(content_definition).get("schema", {})
- return {}
- def example_response(self, status_code=None, content_type=None):
- """
- Returns example response from spec
- """
- # simply use the first/lowest status code, this is probably 200 or 201
- status_code = status_code or sorted(self._responses.keys())[0]
- content_type = content_type or self.get_mimetype()
- examples_path = [str(status_code), 'content', content_type, 'examples']
- example_path = [str(status_code), 'content', content_type, 'example']
- schema_example_path = [
- str(status_code), 'content', content_type, 'schema', 'example'
- ]
- schema_path = [str(status_code), 'content', content_type, 'schema']
- try:
- status_code = int(status_code)
- except ValueError:
- status_code = 200
- try:
- # TODO also use example header?
- return (
- list(deep_get(self._responses, examples_path).values())[0]['value'],
- status_code
- )
- except (KeyError, IndexError):
- pass
- try:
- return (deep_get(self._responses, example_path), status_code)
- except KeyError:
- pass
- try:
- return (deep_get(self._responses, schema_example_path),
- status_code)
- except KeyError:
- pass
- try:
- return (self._nested_example(deep_get(self._responses, schema_path)),
- status_code)
- except KeyError:
- return (None, status_code)
- def _nested_example(self, schema):
- try:
- return schema["example"]
- except KeyError:
- pass
- try:
- # Recurse if schema is an object
- return {key: self._nested_example(value)
- for (key, value) in schema["properties"].items()}
- except KeyError:
- pass
- try:
- # Recurse if schema is an array
- return [self._nested_example(schema["items"])]
- except KeyError:
- raise
- def get_path_parameter_types(self):
- types = {}
- path_parameters = (p for p in self.parameters if p["in"] == "path")
- for path_defn in path_parameters:
- path_schema = path_defn["schema"]
- if path_schema.get('type') == 'string' and path_schema.get('format') == 'path':
- # path is special case for type 'string'
- path_type = 'path'
- else:
- path_type = path_schema.get('type')
- types[path_defn['name']] = path_type
- return types
- @property
- def body_schema(self):
- """
- The body schema definition for this operation.
- """
- return self.body_definition.get('schema', {})
- @property
- def body_definition(self):
- """
- The body complete definition for this operation.
- **There can be one "body" parameter at most.**
- :rtype: dict
- """
- if self._request_body:
- if len(self.consumes) > 1:
- logger.warning(
- 'this operation accepts multiple content types, using %s',
- self.consumes[0])
- res = self._request_body.get('content', {}).get(self.consumes[0], {})
- return self.with_definitions(res)
- return {}
- def _get_body_argument(self, body, arguments, has_kwargs, sanitize):
- if len(arguments) <= 0 and not has_kwargs:
- return {}
- # get the deprecated name from the body-schema for legacy connexion compat
- x_body_name = sanitize(self.body_schema.get('x-body-name'))
- if x_body_name:
- warnings.warn('x-body-name within the requestBody schema will be deprecated in the '
- 'next major version. It should be provided directly under '
- 'the requestBody instead.', DeprecationWarning)
- # prefer the x-body-name as an extension of requestBody, fallback to deprecated schema name, default 'body'
- x_body_name = sanitize(self.request_body.get('x-body-name', x_body_name or 'body'))
- if self.consumes[0] in FORM_CONTENT_TYPES:
- result = self._get_body_argument_form(body)
- else:
- result = self._get_body_argument_json(body)
- if x_body_name in arguments or has_kwargs:
- return {x_body_name: result}
- return {}
- def _get_body_argument_json(self, body):
- # if the body came in null, and the schema says it can be null, we decide
- # to include no value for the body argument, rather than the default body
- if is_nullable(self.body_schema) and is_null(body):
- return None
- if body is None:
- default_body = self.body_schema.get('default', {})
- return deepcopy(default_body)
- return body
- def _get_body_argument_form(self, body):
- # now determine the actual value for the body (whether it came in or is default)
- default_body = self.body_schema.get('default', {})
- body_props = {k: {"schema": v} for k, v
- in self.body_schema.get("properties", {}).items()}
- # by OpenAPI specification `additionalProperties` defaults to `true`
- # see: https://github.com/OAI/OpenAPI-Specification/blame/3.0.2/versions/3.0.2.md#L2305
- additional_props = self.body_schema.get("additionalProperties", True)
- body_arg = deepcopy(default_body)
- body_arg.update(body or {})
- if body_props or additional_props:
- return self._get_typed_body_values(body_arg, body_props, additional_props)
- return {}
- def _get_typed_body_values(self, body_arg, body_props, additional_props):
- """
- Return a copy of the provided body_arg dictionary
- whose values will have the appropriate types
- as defined in the provided schemas.
- :type body_arg: type dict
- :type body_props: dict
- :type additional_props: dict|bool
- :rtype: dict
- """
- additional_props_defn = {"schema": additional_props} if isinstance(additional_props, dict) else None
- res = {}
- for key, value in body_arg.items():
- try:
- prop_defn = body_props[key]
- res[key] = self._get_val_from_param(value, prop_defn)
- except KeyError: # pragma: no cover
- if not additional_props:
- logger.error(f"Body property '{key}' not defined in body schema")
- continue
- if additional_props_defn is not None:
- value = self._get_val_from_param(value, additional_props_defn)
- res[key] = value
- return res
- def _build_default_obj_recursive(self, _properties, res):
- """ takes disparate and nested default keys, and builds up a default object
- """
- for key, prop in _properties.items():
- if 'default' in prop and key not in res:
- res[key] = copy(prop['default'])
- elif prop.get('type') == 'object' and 'properties' in prop:
- res.setdefault(key, {})
- res[key] = self._build_default_obj_recursive(prop['properties'], res[key])
- return res
- def _get_default_obj(self, schema):
- try:
- return deepcopy(schema["default"])
- except KeyError:
- _properties = schema.get("properties", {})
- return self._build_default_obj_recursive(_properties, {})
- def _get_query_defaults(self, query_defns):
- defaults = {}
- for k, v in query_defns.items():
- try:
- if v["schema"]["type"] == "object":
- defaults[k] = self._get_default_obj(v["schema"])
- else:
- defaults[k] = v["schema"]["default"]
- except KeyError:
- pass
- return defaults
- def _get_query_arguments(self, query, arguments, has_kwargs, sanitize):
- query_defns = {p["name"]: p
- for p in self.parameters
- if p["in"] == "query"}
- default_query_params = self._get_query_defaults(query_defns)
- query_arguments = deepcopy(default_query_params)
- query_arguments = deep_merge(query_arguments, query)
- return self._query_args_helper(query_defns, query_arguments,
- arguments, has_kwargs, sanitize)
- def _get_val_from_param(self, value, query_defn):
- query_schema = query_defn["schema"]
- if is_nullable(query_schema) and is_null(value):
- return None
- if query_schema["type"] == "array":
- return [make_type(part, query_schema["items"]["type"]) for part in value]
- else:
- return make_type(value, query_schema["type"])
|