#####################################################
# Copyright (c) Xuanyi Dong [GitHub D-X-Y], 2021.03 #
#####################################################

import os
import abc
import tempfile
import warnings
from typing import Optional, Union, Callable
import torch
import torch.nn as nn
from enum import Enum

import spaces

from .super_utils import IntSpaceType, BoolSpaceType
from .super_utils import LayerOrder, SuperRunMode
from .super_utils import TensorContainer
from .super_utils import ShapeContainer

BEST_DIR_KEY = "best_model_dir"
BEST_SCORE_KEY = "best_model_score"


class SuperModule(abc.ABC, nn.Module):
    """This class equips the nn.Module class with the ability to apply AutoDL."""

    def __init__(self):
        super(SuperModule, self).__init__()
        self._super_run_type = SuperRunMode.Default
        self._abstract_child = None
        self._verbose = False
        self._meta_info = {}

    def set_super_run_type(self, super_run_type):
        def _reset_super_run(m):
            if isinstance(m, SuperModule):
                m._super_run_type = super_run_type

        self.apply(_reset_super_run)

    def add_module(self, name: str, module: Optional[torch.nn.Module]) -> None:
        if not isinstance(module, SuperModule):
            warnings.warn(
                "Add {:}:{:} module, which is not SuperModule, into {:}".format(
                    name, module.__class__.__name__, self.__class__.__name__
                )
                + "\n"
                + "It may cause some functions invalid."
            )
        super(SuperModule, self).add_module(name, module)

    def apply_verbose(self, verbose):
        def _reset_verbose(m):
            if isinstance(m, SuperModule):
                m._verbose = verbose

        self.apply(_reset_verbose)

    def apply_candidate(self, abstract_child):
        if not isinstance(abstract_child, spaces.VirtualNode):
            raise ValueError(
                "Invalid abstract child program: {:}".format(abstract_child)
            )
        self._abstract_child = abstract_child

    def get_w_container(self):
        container = TensorContainer()
        for name, param in self.named_parameters():
            container.append(name, param, True)
        for name, buf in self.named_buffers():
            container.append(name, buf, False)
        return container

    def analyze_weights(self):
        with torch.no_grad():
            for name, param in self.named_parameters():
                shapestr = "[{:10s}] shape={:}".format(name, list(param.shape))
                finalstr = shapestr + "{:.2f} +- {:.2f}".format(
                    param.mean(), param.std()
                )
                print(finalstr)

    def numel(self, buffer=True):
        total = 0
        for name, param in self.named_parameters():
            total += param.numel()
        if buffer:
            for name, buf in self.named_buffers():
                total += buf.numel()
        return total

    def save_best(self, score):
        if BEST_DIR_KEY not in self._meta_info:
            tempdir = tempfile.mkdtemp("-xlayers")
            self._meta_info[BEST_DIR_KEY] = tempdir
        if BEST_SCORE_KEY not in self._meta_info:
            self._meta_info[BEST_SCORE_KEY] = None
        best_score = self._meta_info[BEST_SCORE_KEY]
        if best_score is None or best_score < score:
            best_save_path = os.path.join(
                self._meta_info[BEST_DIR_KEY],
                "best-{:}.pth".format(self.__class__.__name__),
            )
            self._meta_info[BEST_SCORE_KEY] = score
            torch.save(self.state_dict(), best_save_path)
            return True, self._meta_info[BEST_SCORE_KEY]
        else:
            return False, self._meta_info[BEST_SCORE_KEY]

    def load_best(self):
        if BEST_DIR_KEY not in self._meta_info or BEST_SCORE_KEY not in self._meta_info:
            raise ValueError("Please call save_best at first")
        best_save_path = os.path.join(
            self._meta_info[BEST_DIR_KEY],
            "best-{:}.pth".format(self.__class__.__name__),
        )
        state_dict = torch.load(best_save_path)
        self.load_state_dict(state_dict)

    @property
    def abstract_search_space(self):
        raise NotImplementedError

    @property
    def super_run_type(self):
        return self._super_run_type

    @property
    def abstract_child(self):
        return self._abstract_child

    @property
    def verbose(self):
        return self._verbose

    @abc.abstractmethod
    def forward_raw(self, *inputs):
        """Use the largest candidate for forward. Similar to the original PyTorch model."""
        raise NotImplementedError

    @abc.abstractmethod
    def forward_candidate(self, *inputs):
        raise NotImplementedError

    @property
    def name_with_id(self):
        return "name={:}, id={:}".format(self.__class__.__name__, id(self))

    def get_shape_str(self, tensors):
        if isinstance(tensors, (list, tuple)):
            shapes = [self.get_shape_str(tensor) for tensor in tensors]
            if len(shapes) == 1:
                return shapes[0]
            else:
                return ", ".join(shapes)
        elif isinstance(tensors, (torch.Tensor, nn.Parameter)):
            return str(tuple(tensors.shape))
        else:
            raise TypeError("Invalid input type: {:}.".format(type(tensors)))

    def forward(self, *inputs):
        if self.verbose:
            print(
                "[{:}] inputs shape: {:}".format(
                    self.name_with_id, self.get_shape_str(inputs)
                )
            )
        if self.super_run_type == SuperRunMode.FullModel:
            outputs = self.forward_raw(*inputs)
        elif self.super_run_type == SuperRunMode.Candidate:
            outputs = self.forward_candidate(*inputs)
        else:
            raise ModeError(
                "Unknown Super Model Run Mode: {:}".format(self.super_run_type)
            )
        if self.verbose:
            print(
                "[{:}] outputs shape: {:}".format(
                    self.name_with_id, self.get_shape_str(outputs)
                )
            )
        return outputs

    def forward_with_container(self, inputs, container, prefix=[]):
        raise NotImplementedError