import inspect
import logging
import textwrap
from collections import OrderedDict
from decimal import Decimal

import pytz
from django.db import models
from django.utils.encoding import force_str
from rest_framework import serializers, status
from rest_framework.mixins import (
    DestroyModelMixin,
    ListModelMixin,
    RetrieveModelMixin,
    UpdateModelMixin,
)
from rest_framework.parsers import FileUploadParser
from rest_framework.request import is_form_media_type
from rest_framework.settings import api_settings as rest_framework_settings
from rest_framework.utils import encoders, json
from rest_framework.views import APIView

from .app_settings import swagger_settings

try:
    import zoneinfo
except ImportError:
    try:
        from backports import zoneinfo
    except ImportError:
        zoneinfo = None


logger = logging.getLogger(__name__)


class no_body(object):
    """Used as a sentinel value to forcibly remove the body of a request via
    :func:`.swagger_auto_schema`."""

    pass


class unset(object):
    """Used as a sentinel value for function parameters not set by the caller where
    ``None`` would be a valid value."""

    pass


def swagger_auto_schema(
    method=None,
    methods=None,
    auto_schema=unset,
    request_body=None,
    query_serializer=None,
    manual_parameters=None,
    operation_id=None,
    operation_description=None,
    operation_summary=None,
    security=None,
    deprecated=None,
    responses=None,
    field_inspectors=None,
    filter_inspectors=None,
    paginator_inspectors=None,
    tags=None,
    produces=None,
    consumes=None,
    **extra_overrides,
):
    """Decorate a view method to customize the :class:`.Operation` object generated from
    it.

    `method` and `methods` are mutually exclusive and must only be present when
    decorating a view method that accepts more than one HTTP request method.

    The `auto_schema` and `operation_description` arguments take precedence over view-
    or method-level values.

    :param str method: for multi-method views, the http method the options should apply
        to
    :param list[str] methods: for multi-method views, the http methods the options
        should apply to
    :param drf_yasg.inspectors.SwaggerAutoSchema auto_schema: custom class to use for
        generating the Operation object;
        this overrides both the class-level ``swagger_schema`` attribute and the
        ``DEFAULT_AUTO_SCHEMA_CLASS`` setting, and can be set to ``None`` to prevent
        this operation from being generated
    :param request_body: custom request body which will be used as the ``schema``
        property of a :class:`.Parameter` with ``in: 'body'``.

        A Schema or SchemaRef is not valid if this request consumes form-data, because
        ``form`` and ``body`` parameters are mutually exclusive in an
        :class:`.Operation`. If you need to set custom ``form`` parameters, you can use
        the `manual_parameters` argument.

        If a ``Serializer`` class or instance is given, it will be automatically
        converted into a :class:`.Schema` used as a ``body`` :class:`.Parameter`, or
        into a list of ``form`` :class:`.Parameter`\\ s, as appropriate.
    :type request_body: drf_yasg.openapi.Schema or drf_yasg.openapi.SchemaRef or
        rest_framework.serializers.Serializer or type[no_body]

    :param rest_framework.serializers.Serializer query_serializer: if you use a
        ``Serializer`` to parse query parameters, you can pass it here and have
        :class:`.Parameter` objects be generated automatically from it.

        If any ``Field`` on the serializer cannot be represented as a ``query``
        :class:`.Parameter` (e.g. nested Serializers, file fields, ...), the schema
        generation will fail with an error.

        Schema generation will also fail if the name of any Field on the
        `query_serializer` conflicts with parameters generated by ``filter_backends`` or
        ``paginator``.

    :param list[drf_yasg.openapi.Parameter] manual_parameters: a list of manual
        parameters to override the automatically generated ones

        :class:`.Parameter`\\ s are identified by their (``name``, ``in``) combination,
            and any parameters given here will fully override automatically generated
            parameters if they collide.

        It is an error to supply ``form`` parameters when the request does not consume
        form-data.

    :param str operation_id: operation ID override; the operation ID must be unique
        across the whole API
    :param str operation_description: operation description override
    :param str operation_summary: operation summary string
    :param list[dict] security: security requirements override; used to specify which
        authentication mechanism is required to call this API; an empty list marks the
        endpoint as unauthenticated (i.e. removes all accepted authentication schemes),
        and ``None`` will inherit the top-level security requirements
    :param bool deprecated: deprecation status for operation
    :param responses: a dict of documented manual responses keyed on response status
        code. If no success (``2xx``) response is given, one will automatically be
        generated from the request body and http method. If any ``2xx`` response is
        given the automatic response is suppressed.

        * if a plain string is given as value, a :class:`.Response` with no body and
          that string as its description will be generated
        * if ``None`` is given as a value, the response is ignored; this is mainly
          useful for disabling default 2xx responses, i.e.
          ``responses={200: None, 302: 'something'}``
        * if a :class:`.Schema`, :class:`.SchemaRef` is given, a :class:`.Response`
          with the schema as its body and an empty description will be generated
        * a ``Serializer`` class or instance will be converted into a :class:`.Schema`
          and treated as above
        * a :class:`.Response` object will be used as-is; however if its ``schema``
          attribute is a ``Serializer``, it will automatically be converted into a
          :class:`.Schema`
    :type responses: dict[int or str, (drf_yasg.openapi.Schema or
        drf_yasg.openapi.SchemaRef or drf_yasg.openapi.Response or str or
        rest_framework.serializers.Serializer)]

    :param list[type[drf_yasg.inspectors.FieldInspector]] field_inspectors: extra
        serializer and field inspectors; these will be tried before
        :attr:`.ViewInspector.field_inspectors` on the
        :class:`.inspectors.SwaggerAutoSchema`
    :param list[type[drf_yasg.inspectors.FilterInspector]] filter_inspectors: extra
        filter inspectors; these will be tried before
        :attr:`.ViewInspector.filter_inspectors` on the
        :class:`.inspectors.SwaggerAutoSchema`
    :param list[type[drf_yasg.inspectors.PaginatorInspector]] paginator_inspectors:
        extra paginator inspectors; these will be tried before
        :attr:`.ViewInspector.paginator_inspectors` on the
        :class:`.inspectors.SwaggerAutoSchema`
    :param list[str] tags: tags override
    :param list[str] produces: produces override
    :param list[str] consumes: consumes override
    :param extra_overrides: extra values that will be saved into the ``overrides`` dict;
        these values will be available in the handling
        :class:`.inspectors.SwaggerAutoSchema` instance via ``self.overrides``
    """

    def decorator(view_method):
        assert not any(hm in extra_overrides for hm in APIView.http_method_names), (
            "HTTP method names not allowed here"
        )
        data = {
            "consumes": consumes,
            "deprecated": deprecated,
            "field_inspectors": list(field_inspectors) if field_inspectors else None,
            "filter_inspectors": list(filter_inspectors) if filter_inspectors else None,
            "manual_parameters": manual_parameters,
            "operation_description": operation_description,
            "operation_id": operation_id,
            "operation_summary": operation_summary,
            "paginator_inspectors": list(paginator_inspectors)
            if paginator_inspectors
            else None,
            "produces": produces,
            "query_serializer": query_serializer,
            "request_body": request_body,
            "responses": responses,
            "security": security,
            "tags": list(tags) if tags else None,
        }
        data = filter_none(data)
        if auto_schema is not unset:
            data["auto_schema"] = auto_schema
        data.update(extra_overrides)
        if not data:  # pragma: no cover
            # no overrides to set, no use in doing more work
            return view_method

        # if the method is an @action, it will have a bind_to_methods attribute, or a
        # mapping attribute for drf>3.8
        bind_to_methods = getattr(view_method, "bind_to_methods", [])
        mapping = getattr(view_method, "mapping", {})
        mapping_methods = [
            mth for mth, name in mapping.items() if name == view_method.__name__
        ]
        action_http_methods = bind_to_methods + mapping_methods

        # if the method is actually a function based view (@api_view), it will have a
        # 'cls' attribute
        view_cls = getattr(view_method, "cls", None)
        api_view_http_methods = [
            m
            for m in getattr(view_cls, "http_method_names", [])
            if hasattr(view_cls, m)
        ]

        available_http_methods = api_view_http_methods + action_http_methods
        existing_data = getattr(view_method, "_swagger_auto_schema", {})

        _methods = methods
        if methods or method:
            assert available_http_methods, (
                "`method` or `methods` can only be specified on @action or @api_view "
                "views"
            )
            assert bool(methods) != bool(method), "specify either method or methods"
            assert not isinstance(methods, str), (
                "`methods` expects to receive a list of methods;"
                " use `method` for a single argument"
            )
            if method:
                _methods = [method.lower()]
            else:
                _methods = [mth.lower() for mth in methods]
            assert all(mth in available_http_methods for mth in _methods), (
                "http method not bound to view"
            )
            assert not any(mth in existing_data for mth in _methods), (
                "http method defined multiple times"
            )

        if available_http_methods:
            # action or api_view
            assert bool(api_view_http_methods) != bool(action_http_methods), (
                "this should never happen"
            )

            if len(available_http_methods) > 1:
                assert _methods, (
                    "on multi-method api_view or action, you must specify "
                    "swagger_auto_schema on a per-method basis using one of the "
                    "`method` or `methods` arguments"
                )
            else:
                # for a single-method view we assume that single method as the decorator
                # target
                _methods = _methods or available_http_methods

            assert not any(
                hasattr(getattr(view_cls, mth, None), "_swagger_auto_schema")
                for mth in _methods
            ), "swagger_auto_schema applied twice to method"
            assert not any(mth in existing_data for mth in _methods), (
                "swagger_auto_schema applied twice to method"
            )
            existing_data.update((mth.lower(), data) for mth in _methods)
            view_method._swagger_auto_schema = existing_data
        else:
            assert not _methods, (
                "the methods argument should only be specified when decorating an "
                "action; you should also ensure that you put the swagger_auto_schema "
                "decorator AFTER (above) the _route decorator"
            )
            assert not existing_data, "swagger_auto_schema applied twice to method"
            view_method._swagger_auto_schema = data

        return view_method

    return decorator


def swagger_serializer_method(serializer_or_field):
    """
    Decorates the method of a serializers.SerializerMethodField
    to hint as to how Swagger should be generated for this field.

    :param serializer_or_field: ``Serializer``/``Field`` class or instance
    :return:
    """

    def decorator(serializer_method):
        # stash the serializer for SerializerMethodFieldInspector to find
        serializer_method._swagger_serializer = serializer_or_field
        return serializer_method

    return decorator


def is_list_view(path, method, view):
    """Check if the given path/method appears to represent a list view (as opposed to a
    detail/instance view).

    :param str path: view path
    :param str method: http method
    :param APIView view: target view
    :rtype: bool
    """
    # for ViewSets, it could be the default 'list' action, or an @action(detail=False)
    action = getattr(view, "action", "")
    detail = getattr(view, "detail", None)
    suffix = getattr(view, "suffix", None)
    if action in ("list", "create") or detail is False or suffix == "List":
        return True

    if (
        action in ("retrieve", "update", "partial_update", "destroy")
        or method.lower() != "get"
        # a detail action is surely not a list route
        or detail is True
        or suffix == "Instance"
    ):
        return False

    if isinstance(view, ListModelMixin):
        return True

    # for GenericAPIView, if it's a detail view it can't also be a list view
    if isinstance(view, (RetrieveModelMixin, UpdateModelMixin, DestroyModelMixin)):
        return False

    # if the last component in the path is parameterized it's probably not a list view
    path_components = path.strip("/").split("/")
    if path_components and "{" in path_components[-1]:
        return False

    # otherwise assume it's a list view
    return True


def guess_response_status(method):
    if method == "post":
        return status.HTTP_201_CREATED
    elif method == "delete":
        return status.HTTP_204_NO_CONTENT
    else:
        return status.HTTP_200_OK


def param_list_to_odict(parameters):
    """Transform a list of :class:`.Parameter` objects into an ``OrderedDict`` keyed on
    the ``(name, in_)`` tuple of each parameter.

    Raises an ``AssertionError`` if `parameters` contains duplicate parameters (by their
    name + in combination).

    :param list[drf_yasg.openapi.Parameter] parameters: the list of parameters
    :return: `parameters` keyed by ``(name, in_)``
    :rtype: dict[(str,str),drf_yasg.openapi.Parameter]
    """
    result = OrderedDict(((param.name, param.in_), param) for param in parameters)
    assert len(result) == len(parameters), "duplicate Parameters found"
    return result


def merge_params(parameters, overrides):
    """Merge `overrides` into `parameters`. This is the same as appending `overrides` to
    `parameters`, but any element of `parameters` whose ``(name, in_)`` tuple collides
    with an element in `overrides` is replaced by it.

    Raises an ``AssertionError`` if either list contains duplicate parameters.

    :param list[drf_yasg.openapi.Parameter] parameters: initial parameters
    :param list[drf_yasg.openapi.Parameter] overrides: overriding parameters
    :return: merged list
    :rtype: list[drf_yasg.openapi.Parameter]
    """
    parameters = param_list_to_odict(parameters)
    parameters.update(param_list_to_odict(overrides))
    return list(parameters.values())


def filter_none(obj):
    """Remove ``None`` values from tuples, lists or dictionaries. Return other objects
    as-is.

    :param obj: the object
    :return: collection with ``None`` values removed
    """
    if obj is None:
        return None
    new_obj = None
    if isinstance(obj, dict):
        new_obj = type(obj)(
            (k, v) for k, v in obj.items() if k is not None and v is not None
        )
    if isinstance(obj, (list, tuple)):
        new_obj = type(obj)(v for v in obj if v is not None)
    if new_obj is not None and len(new_obj) != len(obj):
        return new_obj  # pragma: no cover
    return obj


def force_serializer_instance(serializer):
    """Force `serializer` into a ``Serializer`` instance. If it is not a ``Serializer``
    class or instance, raises an assertion error.

    :param serializer: serializer class or instance
    :type serializer: serializers.BaseSerializer or type[serializers.BaseSerializer]
    :return: serializer instance
    :rtype: serializers.BaseSerializer
    """
    if inspect.isclass(serializer):
        assert issubclass(serializer, serializers.BaseSerializer), (
            "Serializer required, not %s" % serializer.__name__
        )
        return serializer()

    assert isinstance(serializer, serializers.BaseSerializer), (
        "Serializer class or instance required, not %s" % type(serializer).__name__
    )
    return serializer


def get_serializer_class(serializer):
    """Given a ``Serializer`` class or instance, return the ``Serializer`` class.
    If `serializer` is not a ``Serializer`` class or instance, raises an assertion
    error.

    :param serializer: serializer class or instance, or ``None``
    :return: serializer class
    :rtype: type[serializers.BaseSerializer]
    """
    if serializer is None:
        return None

    if inspect.isclass(serializer):
        assert issubclass(serializer, serializers.BaseSerializer), (
            "Serializer required, not %s" % serializer.__name__
        )
        return serializer

    assert isinstance(serializer, serializers.BaseSerializer), (
        "Serializer class or instance required, not %s" % type(serializer).__name__
    )
    return type(serializer)


def get_object_classes(classes_or_instances, expected_base_class=None):
    """Given a list of instances or class objects, return the list of their classes.

    :param classes_or_instances: mixed list to parse
    :type classes_or_instances: list[type or object]
    :param expected_base_class: if given, only subclasses or instances of this type will
        be returned
    :type expected_base_class: type
    :return: list of classes
    :rtype: list
    """
    classes_or_instances = classes_or_instances or []
    result = []
    for obj in classes_or_instances:
        if inspect.isclass(obj):
            if not expected_base_class or issubclass(obj, expected_base_class):
                result.append(obj)
        else:
            if not expected_base_class or isinstance(obj, expected_base_class):
                result.append(type(obj))

    return result


def get_consumes(parser_classes):
    """Extract ``consumes`` MIME types from a list of parser classes.

    :param list parser_classes: parser classes
    :type parser_classes: list[rest_framework.parsers.BaseParser or
        type[rest_framework.parsers.BaseParser]]
    :return: MIME types for ``consumes``
    :rtype: list[str]
    """
    parser_classes = get_object_classes(parser_classes)
    parser_classes = [
        pc for pc in parser_classes if not issubclass(pc, FileUploadParser)
    ]
    media_types = [parser.media_type for parser in parser_classes or []]
    non_form_media_types = [
        encoding for encoding in media_types if not is_form_media_type(encoding)
    ]
    # Because swagger Parameter objects don't support complex data types
    # (nested objects, arrays), we can't use those unless we are sure the view *only*
    # accepts form data. This means that a view won't support file upload in swagger
    # unless it explicitly sets its parser classes to include only form parsers
    if len(non_form_media_types) == 0:
        return media_types

    # If the form accepts both form data and another type, like json (which is the
    # default config), we will render its input as a Schema and thus it file parameters
    # will be read-only
    return non_form_media_types


def get_produces(renderer_classes):
    """Extract ``produces`` MIME types from a list of renderer classes.

    :param list renderer_classes: renderer classes
    :type renderer_classes: list[rest_framework.renderers.BaseRenderer or
        type[rest_framework.renderers.BaseRenderer]]
    :return: MIME types for ``produces``
    :rtype: list[str]
    """
    renderer_classes = get_object_classes(renderer_classes)
    media_types = [renderer.media_type for renderer in renderer_classes or []]
    media_types = [
        encoding
        for encoding in media_types
        if not any(
            excluded in encoding for excluded in swagger_settings.EXCLUDED_MEDIA_TYPES
        )
    ]
    return media_types


def decimal_as_float(field):
    """Returns true if ``field`` is a django-rest-framework DecimalField and its
    ``coerce_to_string`` attribute or the ``COERCE_DECIMAL_TO_STRING`` setting is set to
    ``False``.

    :rtype: bool
    """
    if isinstance(field, serializers.DecimalField) or isinstance(
        field, models.DecimalField
    ):
        return not getattr(
            field, "coerce_to_string", rest_framework_settings.COERCE_DECIMAL_TO_STRING
        )
    return False


def get_serializer_ref_name(serializer):
    """Get serializer's ref_name (or None for ModelSerializer if it is named
    'NestedSerializer')

    :param serializer: Serializer instance
    :return: Serializer's ``ref_name`` or ``None`` for inline serializer
    :rtype: str or None
    """
    serializer_meta = getattr(serializer, "Meta", None)
    serializer_name = type(serializer).__name__
    if hasattr(serializer_meta, "ref_name"):
        ref_name = serializer_meta.ref_name
    elif serializer_name == "NestedSerializer" and isinstance(
        serializer, serializers.ModelSerializer
    ):
        logger.debug(
            "Forcing inline output for ModelSerializer named 'NestedSerializer':\n"
            + str(serializer)
        )
        ref_name = None
    else:
        ref_name = serializer_name
        if ref_name.endswith("Serializer"):
            ref_name = ref_name[: -len("Serializer")]
    return ref_name


def force_real_str(s, encoding="utf-8", strings_only=False, errors="strict"):
    """
    Force `s` into a ``str`` instance.

    Fix for https://github.com/axnsan12/drf-yasg/issues/159
    """
    if s is not None:
        s = force_str(s, encoding, strings_only, errors)
        if not isinstance(s, str):
            s = "" + s

        # Remove common indentation to get the correct Markdown rendering
        s = textwrap.dedent(s)

    return s


def field_value_to_representation(field, value):
    """Convert a python value related to a field (default, choices, etc.) into its
    OpenAPI-compatible representation.

    :param serializers.Field field: field associated with the value
    :param object value: value
    :return: the converted value
    """
    value = field.to_representation(value)
    if isinstance(value, Decimal):
        if decimal_as_float(field):
            value = float(value)
        else:
            value = str(value)

    elif isinstance(value, pytz.BaseTzInfo):
        value = str(value)

    elif zoneinfo is not None and isinstance(value, zoneinfo.ZoneInfo):
        value = str(value)

    # JSON roundtrip ensures that the value is valid JSON;
    # for example, sets and tuples get transformed into lists
    return json.loads(json.dumps(value, cls=encoders.JSONEncoder))


def get_field_default(field):
    """
    Get the default value for a field, converted to a JSON-compatible value while
    properly handling callables.

    :param field: field instance
    :return: default value
    """
    default = getattr(field, "default", serializers.empty)
    if default is not serializers.empty:
        if callable(default):
            try:
                if hasattr(default, "set_context"):
                    default.set_context(field)
                if getattr(default, "requires_context", False):
                    default = default(field)
                else:
                    default = default()
            except Exception:  # pragma: no cover
                logger.warning(
                    "default for %s is callable but it raised an exception when "
                    "called; 'default' will not be set on schema",
                    field,
                    exc_info=True,
                )
                default = serializers.empty

        if default is not serializers.empty and default is not None:
            try:
                default = field_value_to_representation(field, default)
            except Exception:  # pragma: no cover
                logger.warning(
                    "'default' on schema for %s will not be set because "
                    "to_representation raised an exception",
                    field,
                    exc_info=True,
                )
                default = serializers.empty

    return default
