田运杰 10 месяцев назад
Родитель
Сommit
a9972cde06

+ 29 - 0
ultralytics/nn/__init__.py

@@ -0,0 +1,29 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+
+from .tasks import (
+    BaseModel,
+    ClassificationModel,
+    DetectionModel,
+    SegmentationModel,
+    attempt_load_one_weight,
+    attempt_load_weights,
+    guess_model_scale,
+    guess_model_task,
+    parse_model,
+    torch_safe_load,
+    yaml_model_load,
+)
+
+__all__ = (
+    "attempt_load_one_weight",
+    "attempt_load_weights",
+    "parse_model",
+    "yaml_model_load",
+    "guess_model_task",
+    "guess_model_scale",
+    "torch_safe_load",
+    "DetectionModel",
+    "SegmentationModel",
+    "ClassificationModel",
+    "BaseModel",
+)

+ 763 - 0
ultralytics/nn/autobackend.py

@@ -0,0 +1,763 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+
+import ast
+import json
+import platform
+import zipfile
+from collections import OrderedDict, namedtuple
+from pathlib import Path
+
+import cv2
+import numpy as np
+import torch
+import torch.nn as nn
+from PIL import Image
+
+from ultralytics.utils import ARM64, IS_JETSON, IS_RASPBERRYPI, LINUX, LOGGER, ROOT, yaml_load
+from ultralytics.utils.checks import check_requirements, check_suffix, check_version, check_yaml
+from ultralytics.utils.downloads import attempt_download_asset, is_url
+
+
+def check_class_names(names):
+    """
+    Check class names.
+
+    Map imagenet class codes to human-readable names if required. Convert lists to dicts.
+    """
+    if isinstance(names, list):  # names is a list
+        names = dict(enumerate(names))  # convert to dict
+    if isinstance(names, dict):
+        # Convert 1) string keys to int, i.e. '0' to 0, and non-string values to strings, i.e. True to 'True'
+        names = {int(k): str(v) for k, v in names.items()}
+        n = len(names)
+        if max(names.keys()) >= n:
+            raise KeyError(
+                f"{n}-class dataset requires class indices 0-{n - 1}, but you have invalid class indices "
+                f"{min(names.keys())}-{max(names.keys())} defined in your dataset YAML."
+            )
+        if isinstance(names[0], str) and names[0].startswith("n0"):  # imagenet class codes, i.e. 'n01440764'
+            names_map = yaml_load(ROOT / "cfg/datasets/ImageNet.yaml")["map"]  # human-readable names
+            names = {k: names_map[v] for k, v in names.items()}
+    return names
+
+
+def default_class_names(data=None):
+    """Applies default class names to an input YAML file or returns numerical class names."""
+    if data:
+        try:
+            return yaml_load(check_yaml(data))["names"]
+        except Exception:
+            pass
+    return {i: f"class{i}" for i in range(999)}  # return default if above errors
+
+
+class AutoBackend(nn.Module):
+    """
+    Handles dynamic backend selection for running inference using Ultralytics YOLO models.
+
+    The AutoBackend class is designed to provide an abstraction layer for various inference engines. It supports a wide
+    range of formats, each with specific naming conventions as outlined below:
+
+        Supported Formats and Naming Conventions:
+            | Format                | File Suffix       |
+            |-----------------------|-------------------|
+            | PyTorch               | *.pt              |
+            | TorchScript           | *.torchscript     |
+            | ONNX Runtime          | *.onnx            |
+            | ONNX OpenCV DNN       | *.onnx (dnn=True) |
+            | OpenVINO              | *openvino_model/  |
+            | CoreML                | *.mlpackage       |
+            | TensorRT              | *.engine          |
+            | TensorFlow SavedModel | *_saved_model/    |
+            | TensorFlow GraphDef   | *.pb              |
+            | TensorFlow Lite       | *.tflite          |
+            | TensorFlow Edge TPU   | *_edgetpu.tflite  |
+            | PaddlePaddle          | *_paddle_model/   |
+            | MNN                   | *.mnn             |
+            | NCNN                  | *_ncnn_model/     |
+
+    This class offers dynamic backend switching capabilities based on the input model format, making it easier to deploy
+    models across various platforms.
+    """
+
+    @torch.no_grad()
+    def __init__(
+        self,
+        weights="yolo11n.pt",
+        device=torch.device("cpu"),
+        dnn=False,
+        data=None,
+        fp16=False,
+        batch=1,
+        fuse=True,
+        verbose=True,
+    ):
+        """
+        Initialize the AutoBackend for inference.
+
+        Args:
+            weights (str | torch.nn.Module): Path to the model weights file or a module instance. Defaults to 'yolo11n.pt'.
+            device (torch.device): Device to run the model on. Defaults to CPU.
+            dnn (bool): Use OpenCV DNN module for ONNX inference. Defaults to False.
+            data (str | Path | optional): Path to the additional data.yaml file containing class names. Optional.
+            fp16 (bool): Enable half-precision inference. Supported only on specific backends. Defaults to False.
+            batch (int): Batch-size to assume for inference.
+            fuse (bool): Fuse Conv2D + BatchNorm layers for optimization. Defaults to True.
+            verbose (bool): Enable verbose logging. Defaults to True.
+        """
+        super().__init__()
+        w = str(weights[0] if isinstance(weights, list) else weights)
+        nn_module = isinstance(weights, torch.nn.Module)
+        (
+            pt,
+            jit,
+            onnx,
+            xml,
+            engine,
+            coreml,
+            saved_model,
+            pb,
+            tflite,
+            edgetpu,
+            tfjs,
+            paddle,
+            mnn,
+            ncnn,
+            imx,
+            triton,
+        ) = self._model_type(w)
+        fp16 &= pt or jit or onnx or xml or engine or nn_module or triton  # FP16
+        nhwc = coreml or saved_model or pb or tflite or edgetpu  # BHWC formats (vs torch BCWH)
+        stride = 32  # default stride
+        model, metadata, task = None, None, None
+
+        # Set device
+        cuda = torch.cuda.is_available() and device.type != "cpu"  # use CUDA
+        if cuda and not any([nn_module, pt, jit, engine, onnx, paddle]):  # GPU dataloader formats
+            device = torch.device("cpu")
+            cuda = False
+
+        # Download if not local
+        if not (pt or triton or nn_module):
+            w = attempt_download_asset(w)
+
+        # In-memory PyTorch model
+        if nn_module:
+            model = weights.to(device)
+            if fuse:
+                model = model.fuse(verbose=verbose)
+            if hasattr(model, "kpt_shape"):
+                kpt_shape = model.kpt_shape  # pose-only
+            stride = max(int(model.stride.max()), 32)  # model stride
+            names = model.module.names if hasattr(model, "module") else model.names  # get class names
+            model.half() if fp16 else model.float()
+            self.model = model  # explicitly assign for to(), cpu(), cuda(), half()
+            pt = True
+
+        # PyTorch
+        elif pt:
+            from ultralytics.nn.tasks import attempt_load_weights
+
+            model = attempt_load_weights(
+                weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse
+            )
+            if hasattr(model, "kpt_shape"):
+                kpt_shape = model.kpt_shape  # pose-only
+            stride = max(int(model.stride.max()), 32)  # model stride
+            names = model.module.names if hasattr(model, "module") else model.names  # get class names
+            model.half() if fp16 else model.float()
+            self.model = model  # explicitly assign for to(), cpu(), cuda(), half()
+
+        # TorchScript
+        elif jit:
+            LOGGER.info(f"Loading {w} for TorchScript inference...")
+            extra_files = {"config.txt": ""}  # model metadata
+            model = torch.jit.load(w, _extra_files=extra_files, map_location=device)
+            model.half() if fp16 else model.float()
+            if extra_files["config.txt"]:  # load metadata dict
+                metadata = json.loads(extra_files["config.txt"], object_hook=lambda x: dict(x.items()))
+
+        # ONNX OpenCV DNN
+        elif dnn:
+            LOGGER.info(f"Loading {w} for ONNX OpenCV DNN inference...")
+            check_requirements("opencv-python>=4.5.4")
+            net = cv2.dnn.readNetFromONNX(w)
+
+        # ONNX Runtime and IMX
+        elif onnx or imx:
+            LOGGER.info(f"Loading {w} for ONNX Runtime inference...")
+            check_requirements(("onnx", "onnxruntime-gpu" if cuda else "onnxruntime"))
+            if IS_RASPBERRYPI or IS_JETSON:
+                # Fix 'numpy.linalg._umath_linalg' has no attribute '_ilp64' for TF SavedModel on RPi and Jetson
+                check_requirements("numpy==1.23.5")
+            import onnxruntime
+
+            providers = ["CPUExecutionProvider"]
+            if cuda and "CUDAExecutionProvider" in onnxruntime.get_available_providers():
+                providers.insert(0, "CUDAExecutionProvider")
+            elif cuda:  # Only log warning if CUDA was requested but unavailable
+                LOGGER.warning("WARNING ⚠️ Failed to start ONNX Runtime with CUDA. Using CPU...")
+                device = torch.device("cpu")
+                cuda = False
+            LOGGER.info(f"Using ONNX Runtime {providers[0]}")
+            if onnx:
+                session = onnxruntime.InferenceSession(w, providers=providers)
+            else:
+                check_requirements(
+                    ["model-compression-toolkit==2.1.1", "sony-custom-layers[torch]==0.2.0", "onnxruntime-extensions"]
+                )
+                w = next(Path(w).glob("*.onnx"))
+                LOGGER.info(f"Loading {w} for ONNX IMX inference...")
+                import mct_quantizers as mctq
+                from sony_custom_layers.pytorch.object_detection import nms_ort  # noqa
+
+                session = onnxruntime.InferenceSession(
+                    w, mctq.get_ort_session_options(), providers=["CPUExecutionProvider"]
+                )
+                task = "detect"
+
+            output_names = [x.name for x in session.get_outputs()]
+            metadata = session.get_modelmeta().custom_metadata_map
+            dynamic = isinstance(session.get_outputs()[0].shape[0], str)
+            if not dynamic:
+                io = session.io_binding()
+                bindings = []
+                for output in session.get_outputs():
+                    y_tensor = torch.empty(output.shape, dtype=torch.float16 if fp16 else torch.float32).to(device)
+                    io.bind_output(
+                        name=output.name,
+                        device_type=device.type,
+                        device_id=device.index if cuda else 0,
+                        element_type=np.float16 if fp16 else np.float32,
+                        shape=tuple(y_tensor.shape),
+                        buffer_ptr=y_tensor.data_ptr(),
+                    )
+                    bindings.append(y_tensor)
+
+        # OpenVINO
+        elif xml:
+            LOGGER.info(f"Loading {w} for OpenVINO inference...")
+            check_requirements("openvino>=2024.0.0")
+            import openvino as ov
+
+            core = ov.Core()
+            w = Path(w)
+            if not w.is_file():  # if not *.xml
+                w = next(w.glob("*.xml"))  # get *.xml file from *_openvino_model dir
+            ov_model = core.read_model(model=str(w), weights=w.with_suffix(".bin"))
+            if ov_model.get_parameters()[0].get_layout().empty:
+                ov_model.get_parameters()[0].set_layout(ov.Layout("NCHW"))
+
+            # OpenVINO inference modes are 'LATENCY', 'THROUGHPUT' (not recommended), or 'CUMULATIVE_THROUGHPUT'
+            inference_mode = "CUMULATIVE_THROUGHPUT" if batch > 1 else "LATENCY"
+            LOGGER.info(f"Using OpenVINO {inference_mode} mode for batch={batch} inference...")
+            ov_compiled_model = core.compile_model(
+                ov_model,
+                device_name="AUTO",  # AUTO selects best available device, do not modify
+                config={"PERFORMANCE_HINT": inference_mode},
+            )
+            input_name = ov_compiled_model.input().get_any_name()
+            metadata = w.parent / "metadata.yaml"
+
+        # TensorRT
+        elif engine:
+            LOGGER.info(f"Loading {w} for TensorRT inference...")
+            try:
+                import tensorrt as trt  # noqa https://developer.nvidia.com/nvidia-tensorrt-download
+            except ImportError:
+                if LINUX:
+                    check_requirements("tensorrt>7.0.0,!=10.1.0")
+                import tensorrt as trt  # noqa
+            check_version(trt.__version__, ">=7.0.0", hard=True)
+            check_version(trt.__version__, "!=10.1.0", msg="https://github.com/ultralytics/ultralytics/pull/14239")
+            if device.type == "cpu":
+                device = torch.device("cuda:0")
+            Binding = namedtuple("Binding", ("name", "dtype", "shape", "data", "ptr"))
+            logger = trt.Logger(trt.Logger.INFO)
+            # Read file
+            with open(w, "rb") as f, trt.Runtime(logger) as runtime:
+                try:
+                    meta_len = int.from_bytes(f.read(4), byteorder="little")  # read metadata length
+                    metadata = json.loads(f.read(meta_len).decode("utf-8"))  # read metadata
+                except UnicodeDecodeError:
+                    f.seek(0)  # engine file may lack embedded Ultralytics metadata
+                model = runtime.deserialize_cuda_engine(f.read())  # read engine
+
+            # Model context
+            try:
+                context = model.create_execution_context()
+            except Exception as e:  # model is None
+                LOGGER.error(f"ERROR: TensorRT model exported with a different version than {trt.__version__}\n")
+                raise e
+
+            bindings = OrderedDict()
+            output_names = []
+            fp16 = False  # default updated below
+            dynamic = False
+            is_trt10 = not hasattr(model, "num_bindings")
+            num = range(model.num_io_tensors) if is_trt10 else range(model.num_bindings)
+            for i in num:
+                if is_trt10:
+                    name = model.get_tensor_name(i)
+                    dtype = trt.nptype(model.get_tensor_dtype(name))
+                    is_input = model.get_tensor_mode(name) == trt.TensorIOMode.INPUT
+                    if is_input:
+                        if -1 in tuple(model.get_tensor_shape(name)):
+                            dynamic = True
+                            context.set_input_shape(name, tuple(model.get_tensor_profile_shape(name, 0)[1]))
+                        if dtype == np.float16:
+                            fp16 = True
+                    else:
+                        output_names.append(name)
+                    shape = tuple(context.get_tensor_shape(name))
+                else:  # TensorRT < 10.0
+                    name = model.get_binding_name(i)
+                    dtype = trt.nptype(model.get_binding_dtype(i))
+                    is_input = model.binding_is_input(i)
+                    if model.binding_is_input(i):
+                        if -1 in tuple(model.get_binding_shape(i)):  # dynamic
+                            dynamic = True
+                            context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[1]))
+                        if dtype == np.float16:
+                            fp16 = True
+                    else:
+                        output_names.append(name)
+                    shape = tuple(context.get_binding_shape(i))
+                im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device)
+                bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr()))
+            binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items())
+            batch_size = bindings["images"].shape[0]  # if dynamic, this is instead max batch size
+
+        # CoreML
+        elif coreml:
+            LOGGER.info(f"Loading {w} for CoreML inference...")
+            import coremltools as ct
+
+            model = ct.models.MLModel(w)
+            metadata = dict(model.user_defined_metadata)
+
+        # TF SavedModel
+        elif saved_model:
+            LOGGER.info(f"Loading {w} for TensorFlow SavedModel inference...")
+            import tensorflow as tf
+
+            keras = False  # assume TF1 saved_model
+            model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w)
+            metadata = Path(w) / "metadata.yaml"
+
+        # TF GraphDef
+        elif pb:  # https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt
+            LOGGER.info(f"Loading {w} for TensorFlow GraphDef inference...")
+            import tensorflow as tf
+
+            from ultralytics.engine.exporter import gd_outputs
+
+            def wrap_frozen_graph(gd, inputs, outputs):
+                """Wrap frozen graphs for deployment."""
+                x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), [])  # wrapped
+                ge = x.graph.as_graph_element
+                return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs))
+
+            gd = tf.Graph().as_graph_def()  # TF GraphDef
+            with open(w, "rb") as f:
+                gd.ParseFromString(f.read())
+            frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd))
+            try:  # find metadata in SavedModel alongside GraphDef
+                metadata = next(Path(w).resolve().parent.rglob(f"{Path(w).stem}_saved_model*/metadata.yaml"))
+            except StopIteration:
+                pass
+
+        # TFLite or TFLite Edge TPU
+        elif tflite or edgetpu:  # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python
+            try:  # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu
+                from tflite_runtime.interpreter import Interpreter, load_delegate
+            except ImportError:
+                import tensorflow as tf
+
+                Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate
+            if edgetpu:  # TF Edge TPU https://coral.ai/software/#edgetpu-runtime
+                device = device[3:] if str(device).startswith("tpu") else ":0"
+                LOGGER.info(f"Loading {w} on device {device[1:]} for TensorFlow Lite Edge TPU inference...")
+                delegate = {"Linux": "libedgetpu.so.1", "Darwin": "libedgetpu.1.dylib", "Windows": "edgetpu.dll"}[
+                    platform.system()
+                ]
+                interpreter = Interpreter(
+                    model_path=w,
+                    experimental_delegates=[load_delegate(delegate, options={"device": device})],
+                )
+                device = "cpu"  # Required, otherwise PyTorch will try to use the wrong device
+            else:  # TFLite
+                LOGGER.info(f"Loading {w} for TensorFlow Lite inference...")
+                interpreter = Interpreter(model_path=w)  # load TFLite model
+            interpreter.allocate_tensors()  # allocate
+            input_details = interpreter.get_input_details()  # inputs
+            output_details = interpreter.get_output_details()  # outputs
+            # Load metadata
+            try:
+                with zipfile.ZipFile(w, "r") as model:
+                    meta_file = model.namelist()[0]
+                    metadata = ast.literal_eval(model.read(meta_file).decode("utf-8"))
+            except zipfile.BadZipFile:
+                pass
+
+        # TF.js
+        elif tfjs:
+            raise NotImplementedError("YOLOv8 TF.js inference is not currently supported.")
+
+        # PaddlePaddle
+        elif paddle:
+            LOGGER.info(f"Loading {w} for PaddlePaddle inference...")
+            check_requirements("paddlepaddle-gpu" if cuda else "paddlepaddle")
+            import paddle.inference as pdi  # noqa
+
+            w = Path(w)
+            if not w.is_file():  # if not *.pdmodel
+                w = next(w.rglob("*.pdmodel"))  # get *.pdmodel file from *_paddle_model dir
+            config = pdi.Config(str(w), str(w.with_suffix(".pdiparams")))
+            if cuda:
+                config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0)
+            predictor = pdi.create_predictor(config)
+            input_handle = predictor.get_input_handle(predictor.get_input_names()[0])
+            output_names = predictor.get_output_names()
+            metadata = w.parents[1] / "metadata.yaml"
+
+        # MNN
+        elif mnn:
+            LOGGER.info(f"Loading {w} for MNN inference...")
+            check_requirements("MNN")  # requires MNN
+            import os
+
+            import MNN
+
+            config = {"precision": "low", "backend": "CPU", "numThread": (os.cpu_count() + 1) // 2}
+            rt = MNN.nn.create_runtime_manager((config,))
+            net = MNN.nn.load_module_from_file(w, [], [], runtime_manager=rt, rearrange=True)
+
+            def torch_to_mnn(x):
+                return MNN.expr.const(x.data_ptr(), x.shape)
+
+            metadata = json.loads(net.get_info()["bizCode"])
+
+        # NCNN
+        elif ncnn:
+            LOGGER.info(f"Loading {w} for NCNN inference...")
+            check_requirements("git+https://github.com/Tencent/ncnn.git" if ARM64 else "ncnn")  # requires NCNN
+            import ncnn as pyncnn
+
+            net = pyncnn.Net()
+            net.opt.use_vulkan_compute = cuda
+            w = Path(w)
+            if not w.is_file():  # if not *.param
+                w = next(w.glob("*.param"))  # get *.param file from *_ncnn_model dir
+            net.load_param(str(w))
+            net.load_model(str(w.with_suffix(".bin")))
+            metadata = w.parent / "metadata.yaml"
+
+        # NVIDIA Triton Inference Server
+        elif triton:
+            check_requirements("tritonclient[all]")
+            from ultralytics.utils.triton import TritonRemoteModel
+
+            model = TritonRemoteModel(w)
+            metadata = model.metadata
+
+        # Any other format (unsupported)
+        else:
+            from ultralytics.engine.exporter import export_formats
+
+            raise TypeError(
+                f"model='{w}' is not a supported model format. Ultralytics supports: {export_formats()['Format']}\n"
+                f"See https://docs.ultralytics.com/modes/predict for help."
+            )
+
+        # Load external metadata YAML
+        if isinstance(metadata, (str, Path)) and Path(metadata).exists():
+            metadata = yaml_load(metadata)
+        if metadata and isinstance(metadata, dict):
+            for k, v in metadata.items():
+                if k in {"stride", "batch"}:
+                    metadata[k] = int(v)
+                elif k in {"imgsz", "names", "kpt_shape"} and isinstance(v, str):
+                    metadata[k] = eval(v)
+            stride = metadata["stride"]
+            task = metadata["task"]
+            batch = metadata["batch"]
+            imgsz = metadata["imgsz"]
+            names = metadata["names"]
+            kpt_shape = metadata.get("kpt_shape")
+        elif not (pt or triton or nn_module):
+            LOGGER.warning(f"WARNING ⚠️ Metadata not found for 'model={weights}'")
+
+        # Check names
+        if "names" not in locals():  # names missing
+            names = default_class_names(data)
+        names = check_class_names(names)
+
+        # Disable gradients
+        if pt:
+            for p in model.parameters():
+                p.requires_grad = False
+
+        self.__dict__.update(locals())  # assign all variables to self
+
+    def forward(self, im, augment=False, visualize=False, embed=None):
+        """
+        Runs inference on the YOLOv8 MultiBackend model.
+
+        Args:
+            im (torch.Tensor): The image tensor to perform inference on.
+            augment (bool): whether to perform data augmentation during inference, defaults to False
+            visualize (bool): whether to visualize the output predictions, defaults to False
+            embed (list, optional): A list of feature vectors/embeddings to return.
+
+        Returns:
+            (tuple): Tuple containing the raw output tensor, and processed output for visualization (if visualize=True)
+        """
+        b, ch, h, w = im.shape  # batch, channel, height, width
+        if self.fp16 and im.dtype != torch.float16:
+            im = im.half()  # to FP16
+        if self.nhwc:
+            im = im.permute(0, 2, 3, 1)  # torch BCHW to numpy BHWC shape(1,320,192,3)
+
+        # PyTorch
+        if self.pt or self.nn_module:
+            y = self.model(im, augment=augment, visualize=visualize, embed=embed)
+
+        # TorchScript
+        elif self.jit:
+            y = self.model(im)
+
+        # ONNX OpenCV DNN
+        elif self.dnn:
+            im = im.cpu().numpy()  # torch to numpy
+            self.net.setInput(im)
+            y = self.net.forward()
+
+        # ONNX Runtime
+        elif self.onnx or self.imx:
+            if self.dynamic:
+                im = im.cpu().numpy()  # torch to numpy
+                y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im})
+            else:
+                if not self.cuda:
+                    im = im.cpu()
+                self.io.bind_input(
+                    name="images",
+                    device_type=im.device.type,
+                    device_id=im.device.index if im.device.type == "cuda" else 0,
+                    element_type=np.float16 if self.fp16 else np.float32,
+                    shape=tuple(im.shape),
+                    buffer_ptr=im.data_ptr(),
+                )
+                self.session.run_with_iobinding(self.io)
+                y = self.bindings
+            if self.imx:
+                # boxes, conf, cls
+                y = np.concatenate([y[0], y[1][:, :, None], y[2][:, :, None]], axis=-1)
+
+        # OpenVINO
+        elif self.xml:
+            im = im.cpu().numpy()  # FP32
+
+            if self.inference_mode in {"THROUGHPUT", "CUMULATIVE_THROUGHPUT"}:  # optimized for larger batch-sizes
+                n = im.shape[0]  # number of images in batch
+                results = [None] * n  # preallocate list with None to match the number of images
+
+                def callback(request, userdata):
+                    """Places result in preallocated list using userdata index."""
+                    results[userdata] = request.results
+
+                # Create AsyncInferQueue, set the callback and start asynchronous inference for each input image
+                async_queue = self.ov.runtime.AsyncInferQueue(self.ov_compiled_model)
+                async_queue.set_callback(callback)
+                for i in range(n):
+                    # Start async inference with userdata=i to specify the position in results list
+                    async_queue.start_async(inputs={self.input_name: im[i : i + 1]}, userdata=i)  # keep image as BCHW
+                async_queue.wait_all()  # wait for all inference requests to complete
+                y = np.concatenate([list(r.values())[0] for r in results])
+
+            else:  # inference_mode = "LATENCY", optimized for fastest first result at batch-size 1
+                y = list(self.ov_compiled_model(im).values())
+
+        # TensorRT
+        elif self.engine:
+            if self.dynamic and im.shape != self.bindings["images"].shape:
+                if self.is_trt10:
+                    self.context.set_input_shape("images", im.shape)
+                    self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
+                    for name in self.output_names:
+                        self.bindings[name].data.resize_(tuple(self.context.get_tensor_shape(name)))
+                else:
+                    i = self.model.get_binding_index("images")
+                    self.context.set_binding_shape(i, im.shape)
+                    self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape)
+                    for name in self.output_names:
+                        i = self.model.get_binding_index(name)
+                        self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i)))
+
+            s = self.bindings["images"].shape
+            assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}"
+            self.binding_addrs["images"] = int(im.data_ptr())
+            self.context.execute_v2(list(self.binding_addrs.values()))
+            y = [self.bindings[x].data for x in sorted(self.output_names)]
+
+        # CoreML
+        elif self.coreml:
+            im = im[0].cpu().numpy()
+            im_pil = Image.fromarray((im * 255).astype("uint8"))
+            # im = im.resize((192, 320), Image.BILINEAR)
+            y = self.model.predict({"image": im_pil})  # coordinates are xywh normalized
+            if "confidence" in y:
+                raise TypeError(
+                    "Ultralytics only supports inference of non-pipelined CoreML models exported with "
+                    f"'nms=False', but 'model={w}' has an NMS pipeline created by an 'nms=True' export."
+                )
+                # TODO: CoreML NMS inference handling
+                # from ultralytics.utils.ops import xywh2xyxy
+                # box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]])  # xyxy pixels
+                # conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float32)
+                # y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1)
+            y = list(y.values())
+            if len(y) == 2 and len(y[1].shape) != 4:  # segmentation model
+                y = list(reversed(y))  # reversed for segmentation models (pred, proto)
+
+        # PaddlePaddle
+        elif self.paddle:
+            im = im.cpu().numpy().astype(np.float32)
+            self.input_handle.copy_from_cpu(im)
+            self.predictor.run()
+            y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names]
+
+        # MNN
+        elif self.mnn:
+            input_var = self.torch_to_mnn(im)
+            output_var = self.net.onForward([input_var])
+            y = [x.read() for x in output_var]
+
+        # NCNN
+        elif self.ncnn:
+            mat_in = self.pyncnn.Mat(im[0].cpu().numpy())
+            with self.net.create_extractor() as ex:
+                ex.input(self.net.input_names()[0], mat_in)
+                # WARNING: 'output_names' sorted as a temporary fix for https://github.com/pnnx/pnnx/issues/130
+                y = [np.array(ex.extract(x)[1])[None] for x in sorted(self.net.output_names())]
+
+        # NVIDIA Triton Inference Server
+        elif self.triton:
+            im = im.cpu().numpy()  # torch to numpy
+            y = self.model(im)
+
+        # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU)
+        else:
+            im = im.cpu().numpy()
+            if self.saved_model:  # SavedModel
+                y = self.model(im, training=False) if self.keras else self.model(im)
+                if not isinstance(y, list):
+                    y = [y]
+            elif self.pb:  # GraphDef
+                y = self.frozen_func(x=self.tf.constant(im))
+            else:  # Lite or Edge TPU
+                details = self.input_details[0]
+                is_int = details["dtype"] in {np.int8, np.int16}  # is TFLite quantized int8 or int16 model
+                if is_int:
+                    scale, zero_point = details["quantization"]
+                    im = (im / scale + zero_point).astype(details["dtype"])  # de-scale
+                self.interpreter.set_tensor(details["index"], im)
+                self.interpreter.invoke()
+                y = []
+                for output in self.output_details:
+                    x = self.interpreter.get_tensor(output["index"])
+                    if is_int:
+                        scale, zero_point = output["quantization"]
+                        x = (x.astype(np.float32) - zero_point) * scale  # re-scale
+                    if x.ndim == 3:  # if task is not classification, excluding masks (ndim=4) as well
+                        # Denormalize xywh by image size. See https://github.com/ultralytics/ultralytics/pull/1695
+                        # xywh are normalized in TFLite/EdgeTPU to mitigate quantization error of integer models
+                        if x.shape[-1] == 6:  # end-to-end model
+                            x[:, :, [0, 2]] *= w
+                            x[:, :, [1, 3]] *= h
+                        else:
+                            x[:, [0, 2]] *= w
+                            x[:, [1, 3]] *= h
+                            if self.task == "pose":
+                                x[:, 5::3] *= w
+                                x[:, 6::3] *= h
+                    y.append(x)
+            # TF segment fixes: export is reversed vs ONNX export and protos are transposed
+            if len(y) == 2:  # segment with (det, proto) output order reversed
+                if len(y[1].shape) != 4:
+                    y = list(reversed(y))  # should be y = (1, 116, 8400), (1, 160, 160, 32)
+                if y[1].shape[-1] == 6:  # end-to-end model
+                    y = [y[1]]
+                else:
+                    y[1] = np.transpose(y[1], (0, 3, 1, 2))  # should be y = (1, 116, 8400), (1, 32, 160, 160)
+            y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y]
+
+        # for x in y:
+        #     print(type(x), len(x)) if isinstance(x, (list, tuple)) else print(type(x), x.shape)  # debug shapes
+        if isinstance(y, (list, tuple)):
+            if len(self.names) == 999 and (self.task == "segment" or len(y) == 2):  # segments and names not defined
+                nc = y[0].shape[1] - y[1].shape[1] - 4  # y = (1, 32, 160, 160), (1, 116, 8400)
+                self.names = {i: f"class{i}" for i in range(nc)}
+            return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y]
+        else:
+            return self.from_numpy(y)
+
+    def from_numpy(self, x):
+        """
+        Convert a numpy array to a tensor.
+
+        Args:
+            x (np.ndarray): The array to be converted.
+
+        Returns:
+            (torch.Tensor): The converted tensor
+        """
+        return torch.tensor(x).to(self.device) if isinstance(x, np.ndarray) else x
+
+    def warmup(self, imgsz=(1, 3, 640, 640)):
+        """
+        Warm up the model by running one forward pass with a dummy input.
+
+        Args:
+            imgsz (tuple): The shape of the dummy input tensor in the format (batch_size, channels, height, width)
+        """
+        import torchvision  # noqa (import here so torchvision import time not recorded in postprocess time)
+
+        warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton, self.nn_module
+        if any(warmup_types) and (self.device.type != "cpu" or self.triton):
+            im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device)  # input
+            for _ in range(2 if self.jit else 1):
+                self.forward(im)  # warmup
+
+    @staticmethod
+    def _model_type(p="path/to/model.pt"):
+        """
+        Takes a path to a model file and returns the model type. Possibles types are pt, jit, onnx, xml, engine, coreml,
+        saved_model, pb, tflite, edgetpu, tfjs, ncnn or paddle.
+
+        Args:
+            p: path to the model file. Defaults to path/to/model.pt
+
+        Examples:
+            >>> model = AutoBackend(weights="path/to/model.onnx")
+            >>> model_type = model._model_type()  # returns "onnx"
+        """
+        from ultralytics.engine.exporter import export_formats
+
+        sf = export_formats()["Suffix"]  # export suffixes
+        if not is_url(p) and not isinstance(p, str):
+            check_suffix(p, sf)  # checks
+        name = Path(p).name
+        types = [s in name for s in sf]
+        types[5] |= name.endswith(".mlmodel")  # retain support for older Apple CoreML *.mlmodel formats
+        types[8] &= not types[9]  # tflite &= not edgetpu
+        if any(types):
+            triton = False
+        else:
+            from urllib.parse import urlsplit
+
+            url = urlsplit(p)
+            triton = bool(url.netloc) and bool(url.path) and url.scheme in {"http", "grpc"}
+
+        return types + [triton]

+ 165 - 0
ultralytics/nn/modules/__init__.py

@@ -0,0 +1,165 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""
+Ultralytics modules.
+
+Example:
+    Visualize a module with Netron.
+    ```python
+    from ultralytics.nn.modules import *
+    import torch
+    import os
+
+    x = torch.ones(1, 128, 40, 40)
+    m = Conv(128, 128)
+    f = f"{m._get_name()}.onnx"
+    torch.onnx.export(m, x, f)
+    os.system(f"onnxslim {f} {f} && open {f}")  # pip install onnxslim
+    ```
+"""
+
+from .block import (
+    C1,
+    C2,
+    C2PSA,
+    C3,
+    C3TR,
+    CIB,
+    DFL,
+    ELAN1,
+    PSA,
+    SPP,
+    SPPELAN,
+    SPPF,
+    AConv,
+    ADown,
+    Attention,
+    BNContrastiveHead,
+    Bottleneck,
+    BottleneckCSP,
+    C2f,
+    C2fAttn,
+    C2fCIB,
+    C2fPSA,
+    C3Ghost,
+    C3k2,
+    C3x,
+    CBFuse,
+    CBLinear,
+    ContrastiveHead,
+    GhostBottleneck,
+    HGBlock,
+    HGStem,
+    ImagePoolingAttn,
+    Proto,
+    RepC3,
+    RepNCSPELAN4,
+    RepVGGDW,
+    ResNetLayer,
+    SCDown,
+    TorchVision,
+    A2C2f,
+)
+from .conv import (
+    CBAM,
+    ChannelAttention,
+    Concat,
+    Conv,
+    Conv2,
+    ConvTranspose,
+    DWConv,
+    DWConvTranspose2d,
+    Focus,
+    GhostConv,
+    Index,
+    LightConv,
+    RepConv,
+    SpatialAttention,
+)
+from .head import OBB, Classify, Detect, Pose, RTDETRDecoder, Segment, WorldDetect, v10Detect
+from .transformer import (
+    AIFI,
+    MLP,
+    DeformableTransformerDecoder,
+    DeformableTransformerDecoderLayer,
+    LayerNorm2d,
+    MLPBlock,
+    MSDeformAttn,
+    TransformerBlock,
+    TransformerEncoderLayer,
+    TransformerLayer,
+)
+
+__all__ = (
+    "Conv",
+    "Conv2",
+    "LightConv",
+    "RepConv",
+    "DWConv",
+    "DWConvTranspose2d",
+    "ConvTranspose",
+    "Focus",
+    "GhostConv",
+    "ChannelAttention",
+    "SpatialAttention",
+    "CBAM",
+    "Concat",
+    "TransformerLayer",
+    "TransformerBlock",
+    "MLPBlock",
+    "LayerNorm2d",
+    "DFL",
+    "HGBlock",
+    "HGStem",
+    "SPP",
+    "SPPF",
+    "C1",
+    "C2",
+    "C3",
+    "C2f",
+    "C3k2",
+    "SCDown",
+    "C2fPSA",
+    "C2PSA",
+    "C2fAttn",
+    "C3x",
+    "C3TR",
+    "C3Ghost",
+    "GhostBottleneck",
+    "Bottleneck",
+    "BottleneckCSP",
+    "Proto",
+    "Detect",
+    "Segment",
+    "Pose",
+    "Classify",
+    "TransformerEncoderLayer",
+    "RepC3",
+    "RTDETRDecoder",
+    "AIFI",
+    "DeformableTransformerDecoder",
+    "DeformableTransformerDecoderLayer",
+    "MSDeformAttn",
+    "MLP",
+    "ResNetLayer",
+    "OBB",
+    "WorldDetect",
+    "v10Detect",
+    "ImagePoolingAttn",
+    "ContrastiveHead",
+    "BNContrastiveHead",
+    "RepNCSPELAN4",
+    "ADown",
+    "SPPELAN",
+    "CBFuse",
+    "CBLinear",
+    "AConv",
+    "ELAN1",
+    "RepVGGDW",
+    "CIB",
+    "C2fCIB",
+    "Attention",
+    "PSA",
+    "TorchVision",
+    "Index",
+    "A2C2f"
+)

+ 21 - 0
ultralytics/nn/modules/activation.py

@@ -0,0 +1,21 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""Activation modules."""
+
+import torch
+import torch.nn as nn
+
+
+class AGLU(nn.Module):
+    """Unified activation function module from https://github.com/kostas1515/AGLU."""
+
+    def __init__(self, device=None, dtype=None) -> None:
+        """Initialize the Unified activation function."""
+        super().__init__()
+        self.act = nn.Softplus(beta=-1.0)
+        self.lambd = nn.Parameter(nn.init.uniform_(torch.empty(1, device=device, dtype=dtype)))  # lambda parameter
+        self.kappa = nn.Parameter(nn.init.uniform_(torch.empty(1, device=device, dtype=dtype)))  # kappa parameter
+
+    def forward(self, x: torch.Tensor) -> torch.Tensor:
+        """Compute the forward pass of the Unified activation function."""
+        lam = torch.clamp(self.lambd, min=0.0001)
+        return torch.exp((1 / lam) * self.act((self.kappa * x) - torch.log(lam)))

Разница между файлами не показана из-за своего большого размера
+ 1392 - 0
ultralytics/nn/modules/block.py


Разница между файлами не показана из-за своего большого размера
+ 1335 - 0
ultralytics/nn/modules/block_reshape.py


+ 350 - 0
ultralytics/nn/modules/conv.py

@@ -0,0 +1,350 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""Convolution modules."""
+
+import math
+
+import numpy as np
+import torch
+import torch.nn as nn
+
+__all__ = (
+    "Conv",
+    "Conv2",
+    "LightConv",
+    "DWConv",
+    "DWConvTranspose2d",
+    "ConvTranspose",
+    "Focus",
+    "GhostConv",
+    "ChannelAttention",
+    "SpatialAttention",
+    "CBAM",
+    "Concat",
+    "RepConv",
+    "Index",
+)
+
+
+def autopad(k, p=None, d=1):  # kernel, padding, dilation
+    """Pad to 'same' shape outputs."""
+    if d > 1:
+        k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k]  # actual kernel-size
+    if p is None:
+        p = k // 2 if isinstance(k, int) else [x // 2 for x in k]  # auto-pad
+    return p
+
+
+class Conv(nn.Module):
+    """Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation)."""
+
+    default_act = nn.SiLU()  # default activation
+
+    def __init__(self, c1, c2, k=1, s=1, bias=False, p=None, g=1, d=1, act=True):
+        """Initialize Conv layer with given arguments including activation."""
+        super().__init__()
+        self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=bias)
+        self.bn = nn.BatchNorm2d(c2)
+        self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
+
+    def forward(self, x):
+        """Apply convolution, batch normalization and activation to input tensor."""
+        return self.act(self.bn(self.conv(x)))
+
+    def forward_fuse(self, x):
+        """Apply convolution and activation without batch normalization."""
+        return self.act(self.conv(x))
+
+
+class Conv2(Conv):
+    """Simplified RepConv module with Conv fusing."""
+
+    def __init__(self, c1, c2, k=3, s=1, p=None, g=1, d=1, act=True):
+        """Initialize Conv layer with given arguments including activation."""
+        super().__init__(c1, c2, k, s, p, g=g, d=d, act=act)
+        self.cv2 = nn.Conv2d(c1, c2, 1, s, autopad(1, p, d), groups=g, dilation=d, bias=False)  # add 1x1 conv
+
+    def forward(self, x):
+        """Apply convolution, batch normalization and activation to input tensor."""
+        return self.act(self.bn(self.conv(x) + self.cv2(x)))
+
+    def forward_fuse(self, x):
+        """Apply fused convolution, batch normalization and activation to input tensor."""
+        return self.act(self.bn(self.conv(x)))
+
+    def fuse_convs(self):
+        """Fuse parallel convolutions."""
+        w = torch.zeros_like(self.conv.weight.data)
+        i = [x // 2 for x in w.shape[2:]]
+        w[:, :, i[0] : i[0] + 1, i[1] : i[1] + 1] = self.cv2.weight.data.clone()
+        self.conv.weight.data += w
+        self.__delattr__("cv2")
+        self.forward = self.forward_fuse
+
+
+class LightConv(nn.Module):
+    """
+    Light convolution with args(ch_in, ch_out, kernel).
+
+    https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/backbones/hgnet_v2.py
+    """
+
+    def __init__(self, c1, c2, k=1, act=nn.ReLU()):
+        """Initialize Conv layer with given arguments including activation."""
+        super().__init__()
+        self.conv1 = Conv(c1, c2, 1, act=False)
+        self.conv2 = DWConv(c2, c2, k, act=act)
+
+    def forward(self, x):
+        """Apply 2 convolutions to input tensor."""
+        return self.conv2(self.conv1(x))
+
+
+class DWConv(Conv):
+    """Depth-wise convolution."""
+
+    def __init__(self, c1, c2, k=1, s=1, d=1, act=True):  # ch_in, ch_out, kernel, stride, dilation, activation
+        """Initialize Depth-wise convolution with given parameters."""
+        super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), d=d, act=act)
+
+
+class DWConvTranspose2d(nn.ConvTranspose2d):
+    """Depth-wise transpose convolution."""
+
+    def __init__(self, c1, c2, k=1, s=1, p1=0, p2=0):  # ch_in, ch_out, kernel, stride, padding, padding_out
+        """Initialize DWConvTranspose2d class with given parameters."""
+        super().__init__(c1, c2, k, s, p1, p2, groups=math.gcd(c1, c2))
+
+
+class ConvTranspose(nn.Module):
+    """Convolution transpose 2d layer."""
+
+    default_act = nn.SiLU()  # default activation
+
+    def __init__(self, c1, c2, k=2, s=2, p=0, bn=True, act=True):
+        """Initialize ConvTranspose2d layer with batch normalization and activation function."""
+        super().__init__()
+        self.conv_transpose = nn.ConvTranspose2d(c1, c2, k, s, p, bias=not bn)
+        self.bn = nn.BatchNorm2d(c2) if bn else nn.Identity()
+        self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
+
+    def forward(self, x):
+        """Applies transposed convolutions, batch normalization and activation to input."""
+        return self.act(self.bn(self.conv_transpose(x)))
+
+    def forward_fuse(self, x):
+        """Applies activation and convolution transpose operation to input."""
+        return self.act(self.conv_transpose(x))
+
+
+class Focus(nn.Module):
+    """Focus wh information into c-space."""
+
+    def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True):
+        """Initializes Focus object with user defined channel, convolution, padding, group and activation values."""
+        super().__init__()
+        self.conv = Conv(c1 * 4, c2, k, s, p, g, act=act)
+        # self.contract = Contract(gain=2)
+
+    def forward(self, x):
+        """
+        Applies convolution to concatenated tensor and returns the output.
+
+        Input shape is (b,c,w,h) and output shape is (b,4c,w/2,h/2).
+        """
+        return self.conv(torch.cat((x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]), 1))
+        # return self.conv(self.contract(x))
+
+
+class GhostConv(nn.Module):
+    """Ghost Convolution https://github.com/huawei-noah/ghostnet."""
+
+    def __init__(self, c1, c2, k=1, s=1, g=1, act=True):
+        """Initializes Ghost Convolution module with primary and cheap operations for efficient feature learning."""
+        super().__init__()
+        c_ = c2 // 2  # hidden channels
+        self.cv1 = Conv(c1, c_, k, s, None, g, act=act)
+        self.cv2 = Conv(c_, c_, 5, 1, None, c_, act=act)
+
+    def forward(self, x):
+        """Forward propagation through a Ghost Bottleneck layer with skip connection."""
+        y = self.cv1(x)
+        return torch.cat((y, self.cv2(y)), 1)
+
+
+class RepConv(nn.Module):
+    """
+    RepConv is a basic rep-style block, including training and deploy status.
+
+    This module is used in RT-DETR.
+    Based on https://github.com/DingXiaoH/RepVGG/blob/main/repvgg.py
+    """
+
+    default_act = nn.SiLU()  # default activation
+
+    def __init__(self, c1, c2, k=3, s=1, p=1, g=1, d=1, act=True, bn=False, deploy=False):
+        """Initializes Light Convolution layer with inputs, outputs & optional activation function."""
+        super().__init__()
+        assert k == 3 and p == 1
+        self.g = g
+        self.c1 = c1
+        self.c2 = c2
+        self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()
+
+        self.bn = nn.BatchNorm2d(num_features=c1) if bn and c2 == c1 and s == 1 else None
+        self.conv1 = Conv(c1, c2, k, s, p=p, g=g, act=False)
+        self.conv2 = Conv(c1, c2, 1, s, p=(p - k // 2), g=g, act=False)
+
+    def forward_fuse(self, x):
+        """Forward process."""
+        return self.act(self.conv(x))
+
+    def forward(self, x):
+        """Forward process."""
+        id_out = 0 if self.bn is None else self.bn(x)
+        return self.act(self.conv1(x) + self.conv2(x) + id_out)
+
+    def get_equivalent_kernel_bias(self):
+        """Returns equivalent kernel and bias by adding 3x3 kernel, 1x1 kernel and identity kernel with their biases."""
+        kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1)
+        kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2)
+        kernelid, biasid = self._fuse_bn_tensor(self.bn)
+        return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1) + kernelid, bias3x3 + bias1x1 + biasid
+
+    @staticmethod
+    def _pad_1x1_to_3x3_tensor(kernel1x1):
+        """Pads a 1x1 tensor to a 3x3 tensor."""
+        if kernel1x1 is None:
+            return 0
+        else:
+            return torch.nn.functional.pad(kernel1x1, [1, 1, 1, 1])
+
+    def _fuse_bn_tensor(self, branch):
+        """Generates appropriate kernels and biases for convolution by fusing branches of the neural network."""
+        if branch is None:
+            return 0, 0
+        if isinstance(branch, Conv):
+            kernel = branch.conv.weight
+            running_mean = branch.bn.running_mean
+            running_var = branch.bn.running_var
+            gamma = branch.bn.weight
+            beta = branch.bn.bias
+            eps = branch.bn.eps
+        elif isinstance(branch, nn.BatchNorm2d):
+            if not hasattr(self, "id_tensor"):
+                input_dim = self.c1 // self.g
+                kernel_value = np.zeros((self.c1, input_dim, 3, 3), dtype=np.float32)
+                for i in range(self.c1):
+                    kernel_value[i, i % input_dim, 1, 1] = 1
+                self.id_tensor = torch.from_numpy(kernel_value).to(branch.weight.device)
+            kernel = self.id_tensor
+            running_mean = branch.running_mean
+            running_var = branch.running_var
+            gamma = branch.weight
+            beta = branch.bias
+            eps = branch.eps
+        std = (running_var + eps).sqrt()
+        t = (gamma / std).reshape(-1, 1, 1, 1)
+        return kernel * t, beta - running_mean * gamma / std
+
+    def fuse_convs(self):
+        """Combines two convolution layers into a single layer and removes unused attributes from the class."""
+        if hasattr(self, "conv"):
+            return
+        kernel, bias = self.get_equivalent_kernel_bias()
+        self.conv = nn.Conv2d(
+            in_channels=self.conv1.conv.in_channels,
+            out_channels=self.conv1.conv.out_channels,
+            kernel_size=self.conv1.conv.kernel_size,
+            stride=self.conv1.conv.stride,
+            padding=self.conv1.conv.padding,
+            dilation=self.conv1.conv.dilation,
+            groups=self.conv1.conv.groups,
+            bias=True,
+        ).requires_grad_(False)
+        self.conv.weight.data = kernel
+        self.conv.bias.data = bias
+        for para in self.parameters():
+            para.detach_()
+        self.__delattr__("conv1")
+        self.__delattr__("conv2")
+        if hasattr(self, "nm"):
+            self.__delattr__("nm")
+        if hasattr(self, "bn"):
+            self.__delattr__("bn")
+        if hasattr(self, "id_tensor"):
+            self.__delattr__("id_tensor")
+
+
+class ChannelAttention(nn.Module):
+    """Channel-attention module https://github.com/open-mmlab/mmdetection/tree/v3.0.0rc1/configs/rtmdet."""
+
+    def __init__(self, channels: int) -> None:
+        """Initializes the class and sets the basic configurations and instance variables required."""
+        super().__init__()
+        self.pool = nn.AdaptiveAvgPool2d(1)
+        self.fc = nn.Conv2d(channels, channels, 1, 1, 0, bias=True)
+        self.act = nn.Sigmoid()
+
+    def forward(self, x: torch.Tensor) -> torch.Tensor:
+        """Applies forward pass using activation on convolutions of the input, optionally using batch normalization."""
+        return x * self.act(self.fc(self.pool(x)))
+
+
+class SpatialAttention(nn.Module):
+    """Spatial-attention module."""
+
+    def __init__(self, kernel_size=7):
+        """Initialize Spatial-attention module with kernel size argument."""
+        super().__init__()
+        assert kernel_size in {3, 7}, "kernel size must be 3 or 7"
+        padding = 3 if kernel_size == 7 else 1
+        self.cv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
+        self.act = nn.Sigmoid()
+
+    def forward(self, x):
+        """Apply channel and spatial attention on input for feature recalibration."""
+        return x * self.act(self.cv1(torch.cat([torch.mean(x, 1, keepdim=True), torch.max(x, 1, keepdim=True)[0]], 1)))
+
+
+class CBAM(nn.Module):
+    """Convolutional Block Attention Module."""
+
+    def __init__(self, c1, kernel_size=7):
+        """Initialize CBAM with given input channel (c1) and kernel size."""
+        super().__init__()
+        self.channel_attention = ChannelAttention(c1)
+        self.spatial_attention = SpatialAttention(kernel_size)
+
+    def forward(self, x):
+        """Applies the forward pass through C1 module."""
+        return self.spatial_attention(self.channel_attention(x))
+
+
+class Concat(nn.Module):
+    """Concatenate a list of tensors along dimension."""
+
+    def __init__(self, dimension=1):
+        """Concatenates a list of tensors along a specified dimension."""
+        super().__init__()
+        self.d = dimension
+
+    def forward(self, x):
+        """Forward pass for the YOLOv8 mask Proto module."""
+        return torch.cat(x, self.d)
+
+
+class Index(nn.Module):
+    """Returns a particular index of the input."""
+
+    def __init__(self, c1, c2, index=0):
+        """Returns a particular index of the input."""
+        super().__init__()
+        self.index = index
+
+    def forward(self, x):
+        """
+        Forward pass.
+
+        Expects a list of tensors as input.
+        """
+        return x[self.index]

+ 625 - 0
ultralytics/nn/modules/head.py

@@ -0,0 +1,625 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""Model head modules."""
+
+import copy
+import math
+
+import torch
+import torch.nn as nn
+from torch.nn.init import constant_, xavier_uniform_
+
+from ultralytics.utils.tal import TORCH_1_10, dist2bbox, dist2rbox, make_anchors
+
+from .block import DFL, BNContrastiveHead, ContrastiveHead, Proto
+from .conv import Conv, DWConv
+from .transformer import MLP, DeformableTransformerDecoder, DeformableTransformerDecoderLayer
+from .utils import bias_init_with_prob, linear_init
+
+__all__ = "Detect", "Segment", "Pose", "Classify", "OBB", "RTDETRDecoder", "v10Detect"
+
+
+class Detect(nn.Module):
+    """YOLO Detect head for detection models."""
+
+    dynamic = False  # force grid reconstruction
+    export = False  # export mode
+    format = None  # export format
+    end2end = False  # end2end
+    max_det = 300  # max_det
+    shape = None
+    anchors = torch.empty(0)  # init
+    strides = torch.empty(0)  # init
+    legacy = False  # backward compatibility for v3/v5/v8/v9 models
+
+    def __init__(self, nc=80, ch=()):
+        """Initializes the YOLO detection layer with specified number of classes and channels."""
+        super().__init__()
+        self.nc = nc  # number of classes
+        self.nl = len(ch)  # number of detection layers
+        self.reg_max = 16  # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
+        self.no = nc + self.reg_max * 4  # number of outputs per anchor
+        self.stride = torch.zeros(self.nl)  # strides computed during build
+        c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], min(self.nc, 100))  # channels
+        self.cv2 = nn.ModuleList(
+            nn.Sequential(Conv(x, c2, 3), Conv(c2, c2, 3), nn.Conv2d(c2, 4 * self.reg_max, 1)) for x in ch
+        )
+        self.cv3 = (
+            nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, self.nc, 1)) for x in ch)
+            if self.legacy
+            else nn.ModuleList(
+                nn.Sequential(
+                    nn.Sequential(DWConv(x, x, 3), Conv(x, c3, 1)),
+                    nn.Sequential(DWConv(c3, c3, 3), Conv(c3, c3, 1)),
+                    nn.Conv2d(c3, self.nc, 1),
+                )
+                for x in ch
+            )
+        )
+        self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()
+
+        if self.end2end:
+            self.one2one_cv2 = copy.deepcopy(self.cv2)
+            self.one2one_cv3 = copy.deepcopy(self.cv3)
+
+    def forward(self, x):
+        """Concatenates and returns predicted bounding boxes and class probabilities."""
+        if self.end2end:
+            return self.forward_end2end(x)
+
+        for i in range(self.nl):
+            x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
+        if self.training:  # Training path
+            return x
+        y = self._inference(x)
+        return y if self.export else (y, x)
+
+    def forward_end2end(self, x):
+        """
+        Performs forward pass of the v10Detect module.
+
+        Args:
+            x (tensor): Input tensor.
+
+        Returns:
+            (dict, tensor): If not in training mode, returns a dictionary containing the outputs of both one2many and one2one detections.
+                           If in training mode, returns a dictionary containing the outputs of one2many and one2one detections separately.
+        """
+        x_detach = [xi.detach() for xi in x]
+        one2one = [
+            torch.cat((self.one2one_cv2[i](x_detach[i]), self.one2one_cv3[i](x_detach[i])), 1) for i in range(self.nl)
+        ]
+        for i in range(self.nl):
+            x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
+        if self.training:  # Training path
+            return {"one2many": x, "one2one": one2one}
+
+        y = self._inference(one2one)
+        y = self.postprocess(y.permute(0, 2, 1), self.max_det, self.nc)
+        return y if self.export else (y, {"one2many": x, "one2one": one2one})
+
+    def _inference(self, x):
+        """Decode predicted bounding boxes and class probabilities based on multiple-level feature maps."""
+        # Inference path
+        shape = x[0].shape  # BCHW
+        x_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)
+        if self.format != "imx" and (self.dynamic or self.shape != shape):
+            self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
+            self.shape = shape
+
+        if self.export and self.format in {"saved_model", "pb", "tflite", "edgetpu", "tfjs"}:  # avoid TF FlexSplitV ops
+            box = x_cat[:, : self.reg_max * 4]
+            cls = x_cat[:, self.reg_max * 4 :]
+        else:
+            box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)
+
+        if self.export and self.format in {"tflite", "edgetpu"}:
+            # Precompute normalization factor to increase numerical stability
+            # See https://github.com/ultralytics/ultralytics/issues/7371
+            grid_h = shape[2]
+            grid_w = shape[3]
+            grid_size = torch.tensor([grid_w, grid_h, grid_w, grid_h], device=box.device).reshape(1, 4, 1)
+            norm = self.strides / (self.stride[0] * grid_size)
+            dbox = self.decode_bboxes(self.dfl(box) * norm, self.anchors.unsqueeze(0) * norm[:, :2])
+        elif self.export and self.format == "imx":
+            dbox = self.decode_bboxes(
+                self.dfl(box) * self.strides, self.anchors.unsqueeze(0) * self.strides, xywh=False
+            )
+            return dbox.transpose(1, 2), cls.sigmoid().permute(0, 2, 1)
+        else:
+            dbox = self.decode_bboxes(self.dfl(box), self.anchors.unsqueeze(0)) * self.strides
+
+        return torch.cat((dbox, cls.sigmoid()), 1)
+
+    def bias_init(self):
+        """Initialize Detect() biases, WARNING: requires stride availability."""
+        m = self  # self.model[-1]  # Detect() module
+        # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
+        # ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum())  # nominal class frequency
+        for a, b, s in zip(m.cv2, m.cv3, m.stride):  # from
+            a[-1].bias.data[:] = 1.0  # box
+            b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)
+        if self.end2end:
+            for a, b, s in zip(m.one2one_cv2, m.one2one_cv3, m.stride):  # from
+                a[-1].bias.data[:] = 1.0  # box
+                b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)
+
+    def decode_bboxes(self, bboxes, anchors, xywh=True):
+        """Decode bounding boxes."""
+        return dist2bbox(bboxes, anchors, xywh=xywh and (not self.end2end), dim=1)
+
+    @staticmethod
+    def postprocess(preds: torch.Tensor, max_det: int, nc: int = 80):
+        """
+        Post-processes YOLO model predictions.
+
+        Args:
+            preds (torch.Tensor): Raw predictions with shape (batch_size, num_anchors, 4 + nc) with last dimension
+                format [x, y, w, h, class_probs].
+            max_det (int): Maximum detections per image.
+            nc (int, optional): Number of classes. Default: 80.
+
+        Returns:
+            (torch.Tensor): Processed predictions with shape (batch_size, min(max_det, num_anchors), 6) and last
+                dimension format [x, y, w, h, max_class_prob, class_index].
+        """
+        batch_size, anchors, _ = preds.shape  # i.e. shape(16,8400,84)
+        boxes, scores = preds.split([4, nc], dim=-1)
+        index = scores.amax(dim=-1).topk(min(max_det, anchors))[1].unsqueeze(-1)
+        boxes = boxes.gather(dim=1, index=index.repeat(1, 1, 4))
+        scores = scores.gather(dim=1, index=index.repeat(1, 1, nc))
+        scores, index = scores.flatten(1).topk(min(max_det, anchors))
+        i = torch.arange(batch_size)[..., None]  # batch indices
+        return torch.cat([boxes[i, index // nc], scores[..., None], (index % nc)[..., None].float()], dim=-1)
+
+
+class Segment(Detect):
+    """YOLO Segment head for segmentation models."""
+
+    def __init__(self, nc=80, nm=32, npr=256, ch=()):
+        """Initialize the YOLO model attributes such as the number of masks, prototypes, and the convolution layers."""
+        super().__init__(nc, ch)
+        self.nm = nm  # number of masks
+        self.npr = npr  # number of protos
+        self.proto = Proto(ch[0], self.npr, self.nm)  # protos
+
+        c4 = max(ch[0] // 4, self.nm)
+        self.cv4 = nn.ModuleList(nn.Sequential(Conv(x, c4, 3), Conv(c4, c4, 3), nn.Conv2d(c4, self.nm, 1)) for x in ch)
+
+    def forward(self, x):
+        """Return model outputs and mask coefficients if training, otherwise return outputs and mask coefficients."""
+        p = self.proto(x[0])  # mask protos
+        bs = p.shape[0]  # batch size
+
+        mc = torch.cat([self.cv4[i](x[i]).view(bs, self.nm, -1) for i in range(self.nl)], 2)  # mask coefficients
+        x = Detect.forward(self, x)
+        if self.training:
+            return x, mc, p
+        return (torch.cat([x, mc], 1), p) if self.export else (torch.cat([x[0], mc], 1), (x[1], mc, p))
+
+
+class OBB(Detect):
+    """YOLO OBB detection head for detection with rotation models."""
+
+    def __init__(self, nc=80, ne=1, ch=()):
+        """Initialize OBB with number of classes `nc` and layer channels `ch`."""
+        super().__init__(nc, ch)
+        self.ne = ne  # number of extra parameters
+
+        c4 = max(ch[0] // 4, self.ne)
+        self.cv4 = nn.ModuleList(nn.Sequential(Conv(x, c4, 3), Conv(c4, c4, 3), nn.Conv2d(c4, self.ne, 1)) for x in ch)
+
+    def forward(self, x):
+        """Concatenates and returns predicted bounding boxes and class probabilities."""
+        bs = x[0].shape[0]  # batch size
+        angle = torch.cat([self.cv4[i](x[i]).view(bs, self.ne, -1) for i in range(self.nl)], 2)  # OBB theta logits
+        # NOTE: set `angle` as an attribute so that `decode_bboxes` could use it.
+        angle = (angle.sigmoid() - 0.25) * math.pi  # [-pi/4, 3pi/4]
+        # angle = angle.sigmoid() * math.pi / 2  # [0, pi/2]
+        if not self.training:
+            self.angle = angle
+        x = Detect.forward(self, x)
+        if self.training:
+            return x, angle
+        return torch.cat([x, angle], 1) if self.export else (torch.cat([x[0], angle], 1), (x[1], angle))
+
+    def decode_bboxes(self, bboxes, anchors):
+        """Decode rotated bounding boxes."""
+        return dist2rbox(bboxes, self.angle, anchors, dim=1)
+
+
+class Pose(Detect):
+    """YOLO Pose head for keypoints models."""
+
+    def __init__(self, nc=80, kpt_shape=(17, 3), ch=()):
+        """Initialize YOLO network with default parameters and Convolutional Layers."""
+        super().__init__(nc, ch)
+        self.kpt_shape = kpt_shape  # number of keypoints, number of dims (2 for x,y or 3 for x,y,visible)
+        self.nk = kpt_shape[0] * kpt_shape[1]  # number of keypoints total
+
+        c4 = max(ch[0] // 4, self.nk)
+        self.cv4 = nn.ModuleList(nn.Sequential(Conv(x, c4, 3), Conv(c4, c4, 3), nn.Conv2d(c4, self.nk, 1)) for x in ch)
+
+    def forward(self, x):
+        """Perform forward pass through YOLO model and return predictions."""
+        bs = x[0].shape[0]  # batch size
+        kpt = torch.cat([self.cv4[i](x[i]).view(bs, self.nk, -1) for i in range(self.nl)], -1)  # (bs, 17*3, h*w)
+        x = Detect.forward(self, x)
+        if self.training:
+            return x, kpt
+        pred_kpt = self.kpts_decode(bs, kpt)
+        return torch.cat([x, pred_kpt], 1) if self.export else (torch.cat([x[0], pred_kpt], 1), (x[1], kpt))
+
+    def kpts_decode(self, bs, kpts):
+        """Decodes keypoints."""
+        ndim = self.kpt_shape[1]
+        if self.export:
+            if self.format in {
+                "tflite",
+                "edgetpu",
+            }:  # required for TFLite export to avoid 'PLACEHOLDER_FOR_GREATER_OP_CODES' bug
+                # Precompute normalization factor to increase numerical stability
+                y = kpts.view(bs, *self.kpt_shape, -1)
+                grid_h, grid_w = self.shape[2], self.shape[3]
+                grid_size = torch.tensor([grid_w, grid_h], device=y.device).reshape(1, 2, 1)
+                norm = self.strides / (self.stride[0] * grid_size)
+                a = (y[:, :, :2] * 2.0 + (self.anchors - 0.5)) * norm
+            else:
+                # NCNN fix
+                y = kpts.view(bs, *self.kpt_shape, -1)
+                a = (y[:, :, :2] * 2.0 + (self.anchors - 0.5)) * self.strides
+            if ndim == 3:
+                a = torch.cat((a, y[:, :, 2:3].sigmoid()), 2)
+            return a.view(bs, self.nk, -1)
+        else:
+            y = kpts.clone()
+            if ndim == 3:
+                y[:, 2::3] = y[:, 2::3].sigmoid()  # sigmoid (WARNING: inplace .sigmoid_() Apple MPS bug)
+            y[:, 0::ndim] = (y[:, 0::ndim] * 2.0 + (self.anchors[0] - 0.5)) * self.strides
+            y[:, 1::ndim] = (y[:, 1::ndim] * 2.0 + (self.anchors[1] - 0.5)) * self.strides
+            return y
+
+
+class Classify(nn.Module):
+    """YOLO classification head, i.e. x(b,c1,20,20) to x(b,c2)."""
+
+    export = False  # export mode
+
+    def __init__(self, c1, c2, k=1, s=1, p=None, g=1):
+        """Initializes YOLO classification head to transform input tensor from (b,c1,20,20) to (b,c2) shape."""
+        super().__init__()
+        c_ = 1280  # efficientnet_b0 size
+        self.conv = Conv(c1, c_, k, s, p, g)
+        self.pool = nn.AdaptiveAvgPool2d(1)  # to x(b,c_,1,1)
+        self.drop = nn.Dropout(p=0.0, inplace=True)
+        self.linear = nn.Linear(c_, c2)  # to x(b,c2)
+
+    def forward(self, x):
+        """Performs a forward pass of the YOLO model on input image data."""
+        if isinstance(x, list):
+            x = torch.cat(x, 1)
+        x = self.linear(self.drop(self.pool(self.conv(x)).flatten(1)))
+        if self.training:
+            return x
+        y = x.softmax(1)  # get final output
+        return y if self.export else (y, x)
+
+
+class WorldDetect(Detect):
+    """Head for integrating YOLO detection models with semantic understanding from text embeddings."""
+
+    def __init__(self, nc=80, embed=512, with_bn=False, ch=()):
+        """Initialize YOLO detection layer with nc classes and layer channels ch."""
+        super().__init__(nc, ch)
+        c3 = max(ch[0], min(self.nc, 100))
+        self.cv3 = nn.ModuleList(nn.Sequential(Conv(x, c3, 3), Conv(c3, c3, 3), nn.Conv2d(c3, embed, 1)) for x in ch)
+        self.cv4 = nn.ModuleList(BNContrastiveHead(embed) if with_bn else ContrastiveHead() for _ in ch)
+
+    def forward(self, x, text):
+        """Concatenates and returns predicted bounding boxes and class probabilities."""
+        for i in range(self.nl):
+            x[i] = torch.cat((self.cv2[i](x[i]), self.cv4[i](self.cv3[i](x[i]), text)), 1)
+        if self.training:
+            return x
+
+        # Inference path
+        shape = x[0].shape  # BCHW
+        x_cat = torch.cat([xi.view(shape[0], self.nc + self.reg_max * 4, -1) for xi in x], 2)
+        if self.dynamic or self.shape != shape:
+            self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
+            self.shape = shape
+
+        if self.export and self.format in {"saved_model", "pb", "tflite", "edgetpu", "tfjs"}:  # avoid TF FlexSplitV ops
+            box = x_cat[:, : self.reg_max * 4]
+            cls = x_cat[:, self.reg_max * 4 :]
+        else:
+            box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)
+
+        if self.export and self.format in {"tflite", "edgetpu"}:
+            # Precompute normalization factor to increase numerical stability
+            # See https://github.com/ultralytics/ultralytics/issues/7371
+            grid_h = shape[2]
+            grid_w = shape[3]
+            grid_size = torch.tensor([grid_w, grid_h, grid_w, grid_h], device=box.device).reshape(1, 4, 1)
+            norm = self.strides / (self.stride[0] * grid_size)
+            dbox = self.decode_bboxes(self.dfl(box) * norm, self.anchors.unsqueeze(0) * norm[:, :2])
+        else:
+            dbox = self.decode_bboxes(self.dfl(box), self.anchors.unsqueeze(0)) * self.strides
+
+        y = torch.cat((dbox, cls.sigmoid()), 1)
+        return y if self.export else (y, x)
+
+    def bias_init(self):
+        """Initialize Detect() biases, WARNING: requires stride availability."""
+        m = self  # self.model[-1]  # Detect() module
+        # cf = torch.bincount(torch.tensor(np.concatenate(dataset.labels, 0)[:, 0]).long(), minlength=nc) + 1
+        # ncf = math.log(0.6 / (m.nc - 0.999999)) if cf is None else torch.log(cf / cf.sum())  # nominal class frequency
+        for a, b, s in zip(m.cv2, m.cv3, m.stride):  # from
+            a[-1].bias.data[:] = 1.0  # box
+            # b[-1].bias.data[:] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)
+
+
+class RTDETRDecoder(nn.Module):
+    """
+    Real-Time Deformable Transformer Decoder (RTDETRDecoder) module for object detection.
+
+    This decoder module utilizes Transformer architecture along with deformable convolutions to predict bounding boxes
+    and class labels for objects in an image. It integrates features from multiple layers and runs through a series of
+    Transformer decoder layers to output the final predictions.
+    """
+
+    export = False  # export mode
+
+    def __init__(
+        self,
+        nc=80,
+        ch=(512, 1024, 2048),
+        hd=256,  # hidden dim
+        nq=300,  # num queries
+        ndp=4,  # num decoder points
+        nh=8,  # num head
+        ndl=6,  # num decoder layers
+        d_ffn=1024,  # dim of feedforward
+        dropout=0.0,
+        act=nn.ReLU(),
+        eval_idx=-1,
+        # Training args
+        nd=100,  # num denoising
+        label_noise_ratio=0.5,
+        box_noise_scale=1.0,
+        learnt_init_query=False,
+    ):
+        """
+        Initializes the RTDETRDecoder module with the given parameters.
+
+        Args:
+            nc (int): Number of classes. Default is 80.
+            ch (tuple): Channels in the backbone feature maps. Default is (512, 1024, 2048).
+            hd (int): Dimension of hidden layers. Default is 256.
+            nq (int): Number of query points. Default is 300.
+            ndp (int): Number of decoder points. Default is 4.
+            nh (int): Number of heads in multi-head attention. Default is 8.
+            ndl (int): Number of decoder layers. Default is 6.
+            d_ffn (int): Dimension of the feed-forward networks. Default is 1024.
+            dropout (float): Dropout rate. Default is 0.
+            act (nn.Module): Activation function. Default is nn.ReLU.
+            eval_idx (int): Evaluation index. Default is -1.
+            nd (int): Number of denoising. Default is 100.
+            label_noise_ratio (float): Label noise ratio. Default is 0.5.
+            box_noise_scale (float): Box noise scale. Default is 1.0.
+            learnt_init_query (bool): Whether to learn initial query embeddings. Default is False.
+        """
+        super().__init__()
+        self.hidden_dim = hd
+        self.nhead = nh
+        self.nl = len(ch)  # num level
+        self.nc = nc
+        self.num_queries = nq
+        self.num_decoder_layers = ndl
+
+        # Backbone feature projection
+        self.input_proj = nn.ModuleList(nn.Sequential(nn.Conv2d(x, hd, 1, bias=False), nn.BatchNorm2d(hd)) for x in ch)
+        # NOTE: simplified version but it's not consistent with .pt weights.
+        # self.input_proj = nn.ModuleList(Conv(x, hd, act=False) for x in ch)
+
+        # Transformer module
+        decoder_layer = DeformableTransformerDecoderLayer(hd, nh, d_ffn, dropout, act, self.nl, ndp)
+        self.decoder = DeformableTransformerDecoder(hd, decoder_layer, ndl, eval_idx)
+
+        # Denoising part
+        self.denoising_class_embed = nn.Embedding(nc, hd)
+        self.num_denoising = nd
+        self.label_noise_ratio = label_noise_ratio
+        self.box_noise_scale = box_noise_scale
+
+        # Decoder embedding
+        self.learnt_init_query = learnt_init_query
+        if learnt_init_query:
+            self.tgt_embed = nn.Embedding(nq, hd)
+        self.query_pos_head = MLP(4, 2 * hd, hd, num_layers=2)
+
+        # Encoder head
+        self.enc_output = nn.Sequential(nn.Linear(hd, hd), nn.LayerNorm(hd))
+        self.enc_score_head = nn.Linear(hd, nc)
+        self.enc_bbox_head = MLP(hd, hd, 4, num_layers=3)
+
+        # Decoder head
+        self.dec_score_head = nn.ModuleList([nn.Linear(hd, nc) for _ in range(ndl)])
+        self.dec_bbox_head = nn.ModuleList([MLP(hd, hd, 4, num_layers=3) for _ in range(ndl)])
+
+        self._reset_parameters()
+
+    def forward(self, x, batch=None):
+        """Runs the forward pass of the module, returning bounding box and classification scores for the input."""
+        from ultralytics.models.utils.ops import get_cdn_group
+
+        # Input projection and embedding
+        feats, shapes = self._get_encoder_input(x)
+
+        # Prepare denoising training
+        dn_embed, dn_bbox, attn_mask, dn_meta = get_cdn_group(
+            batch,
+            self.nc,
+            self.num_queries,
+            self.denoising_class_embed.weight,
+            self.num_denoising,
+            self.label_noise_ratio,
+            self.box_noise_scale,
+            self.training,
+        )
+
+        embed, refer_bbox, enc_bboxes, enc_scores = self._get_decoder_input(feats, shapes, dn_embed, dn_bbox)
+
+        # Decoder
+        dec_bboxes, dec_scores = self.decoder(
+            embed,
+            refer_bbox,
+            feats,
+            shapes,
+            self.dec_bbox_head,
+            self.dec_score_head,
+            self.query_pos_head,
+            attn_mask=attn_mask,
+        )
+        x = dec_bboxes, dec_scores, enc_bboxes, enc_scores, dn_meta
+        if self.training:
+            return x
+        # (bs, 300, 4+nc)
+        y = torch.cat((dec_bboxes.squeeze(0), dec_scores.squeeze(0).sigmoid()), -1)
+        return y if self.export else (y, x)
+
+    def _generate_anchors(self, shapes, grid_size=0.05, dtype=torch.float32, device="cpu", eps=1e-2):
+        """Generates anchor bounding boxes for given shapes with specific grid size and validates them."""
+        anchors = []
+        for i, (h, w) in enumerate(shapes):
+            sy = torch.arange(end=h, dtype=dtype, device=device)
+            sx = torch.arange(end=w, dtype=dtype, device=device)
+            grid_y, grid_x = torch.meshgrid(sy, sx, indexing="ij") if TORCH_1_10 else torch.meshgrid(sy, sx)
+            grid_xy = torch.stack([grid_x, grid_y], -1)  # (h, w, 2)
+
+            valid_WH = torch.tensor([w, h], dtype=dtype, device=device)
+            grid_xy = (grid_xy.unsqueeze(0) + 0.5) / valid_WH  # (1, h, w, 2)
+            wh = torch.ones_like(grid_xy, dtype=dtype, device=device) * grid_size * (2.0**i)
+            anchors.append(torch.cat([grid_xy, wh], -1).view(-1, h * w, 4))  # (1, h*w, 4)
+
+        anchors = torch.cat(anchors, 1)  # (1, h*w*nl, 4)
+        valid_mask = ((anchors > eps) & (anchors < 1 - eps)).all(-1, keepdim=True)  # 1, h*w*nl, 1
+        anchors = torch.log(anchors / (1 - anchors))
+        anchors = anchors.masked_fill(~valid_mask, float("inf"))
+        return anchors, valid_mask
+
+    def _get_encoder_input(self, x):
+        """Processes and returns encoder inputs by getting projection features from input and concatenating them."""
+        # Get projection features
+        x = [self.input_proj[i](feat) for i, feat in enumerate(x)]
+        # Get encoder inputs
+        feats = []
+        shapes = []
+        for feat in x:
+            h, w = feat.shape[2:]
+            # [b, c, h, w] -> [b, h*w, c]
+            feats.append(feat.flatten(2).permute(0, 2, 1))
+            # [nl, 2]
+            shapes.append([h, w])
+
+        # [b, h*w, c]
+        feats = torch.cat(feats, 1)
+        return feats, shapes
+
+    def _get_decoder_input(self, feats, shapes, dn_embed=None, dn_bbox=None):
+        """Generates and prepares the input required for the decoder from the provided features and shapes."""
+        bs = feats.shape[0]
+        # Prepare input for decoder
+        anchors, valid_mask = self._generate_anchors(shapes, dtype=feats.dtype, device=feats.device)
+        features = self.enc_output(valid_mask * feats)  # bs, h*w, 256
+
+        enc_outputs_scores = self.enc_score_head(features)  # (bs, h*w, nc)
+
+        # Query selection
+        # (bs, num_queries)
+        topk_ind = torch.topk(enc_outputs_scores.max(-1).values, self.num_queries, dim=1).indices.view(-1)
+        # (bs, num_queries)
+        batch_ind = torch.arange(end=bs, dtype=topk_ind.dtype).unsqueeze(-1).repeat(1, self.num_queries).view(-1)
+
+        # (bs, num_queries, 256)
+        top_k_features = features[batch_ind, topk_ind].view(bs, self.num_queries, -1)
+        # (bs, num_queries, 4)
+        top_k_anchors = anchors[:, topk_ind].view(bs, self.num_queries, -1)
+
+        # Dynamic anchors + static content
+        refer_bbox = self.enc_bbox_head(top_k_features) + top_k_anchors
+
+        enc_bboxes = refer_bbox.sigmoid()
+        if dn_bbox is not None:
+            refer_bbox = torch.cat([dn_bbox, refer_bbox], 1)
+        enc_scores = enc_outputs_scores[batch_ind, topk_ind].view(bs, self.num_queries, -1)
+
+        embeddings = self.tgt_embed.weight.unsqueeze(0).repeat(bs, 1, 1) if self.learnt_init_query else top_k_features
+        if self.training:
+            refer_bbox = refer_bbox.detach()
+            if not self.learnt_init_query:
+                embeddings = embeddings.detach()
+        if dn_embed is not None:
+            embeddings = torch.cat([dn_embed, embeddings], 1)
+
+        return embeddings, refer_bbox, enc_bboxes, enc_scores
+
+    # TODO
+    def _reset_parameters(self):
+        """Initializes or resets the parameters of the model's various components with predefined weights and biases."""
+        # Class and bbox head init
+        bias_cls = bias_init_with_prob(0.01) / 80 * self.nc
+        # NOTE: the weight initialization in `linear_init` would cause NaN when training with custom datasets.
+        # linear_init(self.enc_score_head)
+        constant_(self.enc_score_head.bias, bias_cls)
+        constant_(self.enc_bbox_head.layers[-1].weight, 0.0)
+        constant_(self.enc_bbox_head.layers[-1].bias, 0.0)
+        for cls_, reg_ in zip(self.dec_score_head, self.dec_bbox_head):
+            # linear_init(cls_)
+            constant_(cls_.bias, bias_cls)
+            constant_(reg_.layers[-1].weight, 0.0)
+            constant_(reg_.layers[-1].bias, 0.0)
+
+        linear_init(self.enc_output[0])
+        xavier_uniform_(self.enc_output[0].weight)
+        if self.learnt_init_query:
+            xavier_uniform_(self.tgt_embed.weight)
+        xavier_uniform_(self.query_pos_head.layers[0].weight)
+        xavier_uniform_(self.query_pos_head.layers[1].weight)
+        for layer in self.input_proj:
+            xavier_uniform_(layer[0].weight)
+
+
+class v10Detect(Detect):
+    """
+    v10 Detection head from https://arxiv.org/pdf/2405.14458.
+
+    Args:
+        nc (int): Number of classes.
+        ch (tuple): Tuple of channel sizes.
+
+    Attributes:
+        max_det (int): Maximum number of detections.
+
+    Methods:
+        __init__(self, nc=80, ch=()): Initializes the v10Detect object.
+        forward(self, x): Performs forward pass of the v10Detect module.
+        bias_init(self): Initializes biases of the Detect module.
+
+    """
+
+    end2end = True
+
+    def __init__(self, nc=80, ch=()):
+        """Initializes the v10Detect object with the specified number of classes and input channels."""
+        super().__init__(nc, ch)
+        c3 = max(ch[0], min(self.nc, 100))  # channels
+        # Light cls head
+        self.cv3 = nn.ModuleList(
+            nn.Sequential(
+                nn.Sequential(Conv(x, x, 3, g=x), Conv(x, c3, 1)),
+                nn.Sequential(Conv(c3, c3, 3, g=c3), Conv(c3, c3, 1)),
+                nn.Conv2d(c3, self.nc, 1),
+            )
+            for x in ch
+        )
+        self.one2one_cv3 = copy.deepcopy(self.cv3)

+ 427 - 0
ultralytics/nn/modules/transformer.py

@@ -0,0 +1,427 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""Transformer modules."""
+
+import math
+
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+from torch.nn.init import constant_, xavier_uniform_
+
+from .conv import Conv
+from .utils import _get_clones, inverse_sigmoid, multi_scale_deformable_attn_pytorch
+
+__all__ = (
+    "TransformerEncoderLayer",
+    "TransformerLayer",
+    "TransformerBlock",
+    "MLPBlock",
+    "LayerNorm2d",
+    "AIFI",
+    "DeformableTransformerDecoder",
+    "DeformableTransformerDecoderLayer",
+    "MSDeformAttn",
+    "MLP",
+)
+
+
+class TransformerEncoderLayer(nn.Module):
+    """Defines a single layer of the transformer encoder."""
+
+    def __init__(self, c1, cm=2048, num_heads=8, dropout=0.0, act=nn.GELU(), normalize_before=False):
+        """Initialize the TransformerEncoderLayer with specified parameters."""
+        super().__init__()
+        from ...utils.torch_utils import TORCH_1_9
+
+        if not TORCH_1_9:
+            raise ModuleNotFoundError(
+                "TransformerEncoderLayer() requires torch>=1.9 to use nn.MultiheadAttention(batch_first=True)."
+            )
+        self.ma = nn.MultiheadAttention(c1, num_heads, dropout=dropout, batch_first=True)
+        # Implementation of Feedforward model
+        self.fc1 = nn.Linear(c1, cm)
+        self.fc2 = nn.Linear(cm, c1)
+
+        self.norm1 = nn.LayerNorm(c1)
+        self.norm2 = nn.LayerNorm(c1)
+        self.dropout = nn.Dropout(dropout)
+        self.dropout1 = nn.Dropout(dropout)
+        self.dropout2 = nn.Dropout(dropout)
+
+        self.act = act
+        self.normalize_before = normalize_before
+
+    @staticmethod
+    def with_pos_embed(tensor, pos=None):
+        """Add position embeddings to the tensor if provided."""
+        return tensor if pos is None else tensor + pos
+
+    def forward_post(self, src, src_mask=None, src_key_padding_mask=None, pos=None):
+        """Performs forward pass with post-normalization."""
+        q = k = self.with_pos_embed(src, pos)
+        src2 = self.ma(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
+        src = src + self.dropout1(src2)
+        src = self.norm1(src)
+        src2 = self.fc2(self.dropout(self.act(self.fc1(src))))
+        src = src + self.dropout2(src2)
+        return self.norm2(src)
+
+    def forward_pre(self, src, src_mask=None, src_key_padding_mask=None, pos=None):
+        """Performs forward pass with pre-normalization."""
+        src2 = self.norm1(src)
+        q = k = self.with_pos_embed(src2, pos)
+        src2 = self.ma(q, k, value=src2, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
+        src = src + self.dropout1(src2)
+        src2 = self.norm2(src)
+        src2 = self.fc2(self.dropout(self.act(self.fc1(src2))))
+        return src + self.dropout2(src2)
+
+    def forward(self, src, src_mask=None, src_key_padding_mask=None, pos=None):
+        """Forward propagates the input through the encoder module."""
+        if self.normalize_before:
+            return self.forward_pre(src, src_mask, src_key_padding_mask, pos)
+        return self.forward_post(src, src_mask, src_key_padding_mask, pos)
+
+
+class AIFI(TransformerEncoderLayer):
+    """Defines the AIFI transformer layer."""
+
+    def __init__(self, c1, cm=2048, num_heads=8, dropout=0, act=nn.GELU(), normalize_before=False):
+        """Initialize the AIFI instance with specified parameters."""
+        super().__init__(c1, cm, num_heads, dropout, act, normalize_before)
+
+    def forward(self, x):
+        """Forward pass for the AIFI transformer layer."""
+        c, h, w = x.shape[1:]
+        pos_embed = self.build_2d_sincos_position_embedding(w, h, c)
+        # Flatten [B, C, H, W] to [B, HxW, C]
+        x = super().forward(x.flatten(2).permute(0, 2, 1), pos=pos_embed.to(device=x.device, dtype=x.dtype))
+        return x.permute(0, 2, 1).view([-1, c, h, w]).contiguous()
+
+    @staticmethod
+    def build_2d_sincos_position_embedding(w, h, embed_dim=256, temperature=10000.0):
+        """Builds 2D sine-cosine position embedding."""
+        assert embed_dim % 4 == 0, "Embed dimension must be divisible by 4 for 2D sin-cos position embedding"
+        grid_w = torch.arange(w, dtype=torch.float32)
+        grid_h = torch.arange(h, dtype=torch.float32)
+        grid_w, grid_h = torch.meshgrid(grid_w, grid_h, indexing="ij")
+        pos_dim = embed_dim // 4
+        omega = torch.arange(pos_dim, dtype=torch.float32) / pos_dim
+        omega = 1.0 / (temperature**omega)
+
+        out_w = grid_w.flatten()[..., None] @ omega[None]
+        out_h = grid_h.flatten()[..., None] @ omega[None]
+
+        return torch.cat([torch.sin(out_w), torch.cos(out_w), torch.sin(out_h), torch.cos(out_h)], 1)[None]
+
+
+class TransformerLayer(nn.Module):
+    """Transformer layer https://arxiv.org/abs/2010.11929 (LayerNorm layers removed for better performance)."""
+
+    def __init__(self, c, num_heads):
+        """Initializes a self-attention mechanism using linear transformations and multi-head attention."""
+        super().__init__()
+        self.q = nn.Linear(c, c, bias=False)
+        self.k = nn.Linear(c, c, bias=False)
+        self.v = nn.Linear(c, c, bias=False)
+        self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads)
+        self.fc1 = nn.Linear(c, c, bias=False)
+        self.fc2 = nn.Linear(c, c, bias=False)
+
+    def forward(self, x):
+        """Apply a transformer block to the input x and return the output."""
+        x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x
+        return self.fc2(self.fc1(x)) + x
+
+
+class TransformerBlock(nn.Module):
+    """Vision Transformer https://arxiv.org/abs/2010.11929."""
+
+    def __init__(self, c1, c2, num_heads, num_layers):
+        """Initialize a Transformer module with position embedding and specified number of heads and layers."""
+        super().__init__()
+        self.conv = None
+        if c1 != c2:
+            self.conv = Conv(c1, c2)
+        self.linear = nn.Linear(c2, c2)  # learnable position embedding
+        self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers)))
+        self.c2 = c2
+
+    def forward(self, x):
+        """Forward propagates the input through the bottleneck module."""
+        if self.conv is not None:
+            x = self.conv(x)
+        b, _, w, h = x.shape
+        p = x.flatten(2).permute(2, 0, 1)
+        return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h)
+
+
+class MLPBlock(nn.Module):
+    """Implements a single block of a multi-layer perceptron."""
+
+    def __init__(self, embedding_dim, mlp_dim, act=nn.GELU):
+        """Initialize the MLPBlock with specified embedding dimension, MLP dimension, and activation function."""
+        super().__init__()
+        self.lin1 = nn.Linear(embedding_dim, mlp_dim)
+        self.lin2 = nn.Linear(mlp_dim, embedding_dim)
+        self.act = act()
+
+    def forward(self, x: torch.Tensor) -> torch.Tensor:
+        """Forward pass for the MLPBlock."""
+        return self.lin2(self.act(self.lin1(x)))
+
+
+class MLP(nn.Module):
+    """Implements a simple multi-layer perceptron (also called FFN)."""
+
+    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, act=nn.ReLU, sigmoid=False):
+        """Initialize the MLP with specified input, hidden, output dimensions and number of layers."""
+        super().__init__()
+        self.num_layers = num_layers
+        h = [hidden_dim] * (num_layers - 1)
+        self.layers = nn.ModuleList(nn.Linear(n, k) for n, k in zip([input_dim] + h, h + [output_dim]))
+        self.sigmoid = sigmoid
+        self.act = act()
+
+    def forward(self, x):
+        """Forward pass for the entire MLP."""
+        for i, layer in enumerate(self.layers):
+            x = getattr(self, "act", nn.ReLU())(layer(x)) if i < self.num_layers - 1 else layer(x)
+        return x.sigmoid() if getattr(self, "sigmoid", False) else x
+
+
+class LayerNorm2d(nn.Module):
+    """
+    2D Layer Normalization module inspired by Detectron2 and ConvNeXt implementations.
+
+    Original implementations in
+    https://github.com/facebookresearch/detectron2/blob/main/detectron2/layers/batch_norm.py
+    and
+    https://github.com/facebookresearch/ConvNeXt/blob/main/models/convnext.py.
+    """
+
+    def __init__(self, num_channels, eps=1e-6):
+        """Initialize LayerNorm2d with the given parameters."""
+        super().__init__()
+        self.weight = nn.Parameter(torch.ones(num_channels))
+        self.bias = nn.Parameter(torch.zeros(num_channels))
+        self.eps = eps
+
+    def forward(self, x):
+        """Perform forward pass for 2D layer normalization."""
+        u = x.mean(1, keepdim=True)
+        s = (x - u).pow(2).mean(1, keepdim=True)
+        x = (x - u) / torch.sqrt(s + self.eps)
+        return self.weight[:, None, None] * x + self.bias[:, None, None]
+
+
+class MSDeformAttn(nn.Module):
+    """
+    Multiscale Deformable Attention Module based on Deformable-DETR and PaddleDetection implementations.
+
+    https://github.com/fundamentalvision/Deformable-DETR/blob/main/models/ops/modules/ms_deform_attn.py
+    """
+
+    def __init__(self, d_model=256, n_levels=4, n_heads=8, n_points=4):
+        """Initialize MSDeformAttn with the given parameters."""
+        super().__init__()
+        if d_model % n_heads != 0:
+            raise ValueError(f"d_model must be divisible by n_heads, but got {d_model} and {n_heads}")
+        _d_per_head = d_model // n_heads
+        # Better to set _d_per_head to a power of 2 which is more efficient in a CUDA implementation
+        assert _d_per_head * n_heads == d_model, "`d_model` must be divisible by `n_heads`"
+
+        self.im2col_step = 64
+
+        self.d_model = d_model
+        self.n_levels = n_levels
+        self.n_heads = n_heads
+        self.n_points = n_points
+
+        self.sampling_offsets = nn.Linear(d_model, n_heads * n_levels * n_points * 2)
+        self.attention_weights = nn.Linear(d_model, n_heads * n_levels * n_points)
+        self.value_proj = nn.Linear(d_model, d_model)
+        self.output_proj = nn.Linear(d_model, d_model)
+
+        self._reset_parameters()
+
+    def _reset_parameters(self):
+        """Reset module parameters."""
+        constant_(self.sampling_offsets.weight.data, 0.0)
+        thetas = torch.arange(self.n_heads, dtype=torch.float32) * (2.0 * math.pi / self.n_heads)
+        grid_init = torch.stack([thetas.cos(), thetas.sin()], -1)
+        grid_init = (
+            (grid_init / grid_init.abs().max(-1, keepdim=True)[0])
+            .view(self.n_heads, 1, 1, 2)
+            .repeat(1, self.n_levels, self.n_points, 1)
+        )
+        for i in range(self.n_points):
+            grid_init[:, :, i, :] *= i + 1
+        with torch.no_grad():
+            self.sampling_offsets.bias = nn.Parameter(grid_init.view(-1))
+        constant_(self.attention_weights.weight.data, 0.0)
+        constant_(self.attention_weights.bias.data, 0.0)
+        xavier_uniform_(self.value_proj.weight.data)
+        constant_(self.value_proj.bias.data, 0.0)
+        xavier_uniform_(self.output_proj.weight.data)
+        constant_(self.output_proj.bias.data, 0.0)
+
+    def forward(self, query, refer_bbox, value, value_shapes, value_mask=None):
+        """
+        Perform forward pass for multiscale deformable attention.
+
+        https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py
+
+        Args:
+            query (torch.Tensor): [bs, query_length, C]
+            refer_bbox (torch.Tensor): [bs, query_length, n_levels, 2], range in [0, 1], top-left (0,0),
+                bottom-right (1, 1), including padding area
+            value (torch.Tensor): [bs, value_length, C]
+            value_shapes (List): [n_levels, 2], [(H_0, W_0), (H_1, W_1), ..., (H_{L-1}, W_{L-1})]
+            value_mask (Tensor): [bs, value_length], True for non-padding elements, False for padding elements
+
+        Returns:
+            output (Tensor): [bs, Length_{query}, C]
+        """
+        bs, len_q = query.shape[:2]
+        len_v = value.shape[1]
+        assert sum(s[0] * s[1] for s in value_shapes) == len_v
+
+        value = self.value_proj(value)
+        if value_mask is not None:
+            value = value.masked_fill(value_mask[..., None], float(0))
+        value = value.view(bs, len_v, self.n_heads, self.d_model // self.n_heads)
+        sampling_offsets = self.sampling_offsets(query).view(bs, len_q, self.n_heads, self.n_levels, self.n_points, 2)
+        attention_weights = self.attention_weights(query).view(bs, len_q, self.n_heads, self.n_levels * self.n_points)
+        attention_weights = F.softmax(attention_weights, -1).view(bs, len_q, self.n_heads, self.n_levels, self.n_points)
+        # N, Len_q, n_heads, n_levels, n_points, 2
+        num_points = refer_bbox.shape[-1]
+        if num_points == 2:
+            offset_normalizer = torch.as_tensor(value_shapes, dtype=query.dtype, device=query.device).flip(-1)
+            add = sampling_offsets / offset_normalizer[None, None, None, :, None, :]
+            sampling_locations = refer_bbox[:, :, None, :, None, :] + add
+        elif num_points == 4:
+            add = sampling_offsets / self.n_points * refer_bbox[:, :, None, :, None, 2:] * 0.5
+            sampling_locations = refer_bbox[:, :, None, :, None, :2] + add
+        else:
+            raise ValueError(f"Last dim of reference_points must be 2 or 4, but got {num_points}.")
+        output = multi_scale_deformable_attn_pytorch(value, value_shapes, sampling_locations, attention_weights)
+        return self.output_proj(output)
+
+
+class DeformableTransformerDecoderLayer(nn.Module):
+    """
+    Deformable Transformer Decoder Layer inspired by PaddleDetection and Deformable-DETR implementations.
+
+    https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py
+    https://github.com/fundamentalvision/Deformable-DETR/blob/main/models/deformable_transformer.py
+    """
+
+    def __init__(self, d_model=256, n_heads=8, d_ffn=1024, dropout=0.0, act=nn.ReLU(), n_levels=4, n_points=4):
+        """Initialize the DeformableTransformerDecoderLayer with the given parameters."""
+        super().__init__()
+
+        # Self attention
+        self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout)
+        self.dropout1 = nn.Dropout(dropout)
+        self.norm1 = nn.LayerNorm(d_model)
+
+        # Cross attention
+        self.cross_attn = MSDeformAttn(d_model, n_levels, n_heads, n_points)
+        self.dropout2 = nn.Dropout(dropout)
+        self.norm2 = nn.LayerNorm(d_model)
+
+        # FFN
+        self.linear1 = nn.Linear(d_model, d_ffn)
+        self.act = act
+        self.dropout3 = nn.Dropout(dropout)
+        self.linear2 = nn.Linear(d_ffn, d_model)
+        self.dropout4 = nn.Dropout(dropout)
+        self.norm3 = nn.LayerNorm(d_model)
+
+    @staticmethod
+    def with_pos_embed(tensor, pos):
+        """Add positional embeddings to the input tensor, if provided."""
+        return tensor if pos is None else tensor + pos
+
+    def forward_ffn(self, tgt):
+        """Perform forward pass through the Feed-Forward Network part of the layer."""
+        tgt2 = self.linear2(self.dropout3(self.act(self.linear1(tgt))))
+        tgt = tgt + self.dropout4(tgt2)
+        return self.norm3(tgt)
+
+    def forward(self, embed, refer_bbox, feats, shapes, padding_mask=None, attn_mask=None, query_pos=None):
+        """Perform the forward pass through the entire decoder layer."""
+        # Self attention
+        q = k = self.with_pos_embed(embed, query_pos)
+        tgt = self.self_attn(q.transpose(0, 1), k.transpose(0, 1), embed.transpose(0, 1), attn_mask=attn_mask)[
+            0
+        ].transpose(0, 1)
+        embed = embed + self.dropout1(tgt)
+        embed = self.norm1(embed)
+
+        # Cross attention
+        tgt = self.cross_attn(
+            self.with_pos_embed(embed, query_pos), refer_bbox.unsqueeze(2), feats, shapes, padding_mask
+        )
+        embed = embed + self.dropout2(tgt)
+        embed = self.norm2(embed)
+
+        # FFN
+        return self.forward_ffn(embed)
+
+
+class DeformableTransformerDecoder(nn.Module):
+    """
+    Implementation of Deformable Transformer Decoder based on PaddleDetection.
+
+    https://github.com/PaddlePaddle/PaddleDetection/blob/develop/ppdet/modeling/transformers/deformable_transformer.py
+    """
+
+    def __init__(self, hidden_dim, decoder_layer, num_layers, eval_idx=-1):
+        """Initialize the DeformableTransformerDecoder with the given parameters."""
+        super().__init__()
+        self.layers = _get_clones(decoder_layer, num_layers)
+        self.num_layers = num_layers
+        self.hidden_dim = hidden_dim
+        self.eval_idx = eval_idx if eval_idx >= 0 else num_layers + eval_idx
+
+    def forward(
+        self,
+        embed,  # decoder embeddings
+        refer_bbox,  # anchor
+        feats,  # image features
+        shapes,  # feature shapes
+        bbox_head,
+        score_head,
+        pos_mlp,
+        attn_mask=None,
+        padding_mask=None,
+    ):
+        """Perform the forward pass through the entire decoder."""
+        output = embed
+        dec_bboxes = []
+        dec_cls = []
+        last_refined_bbox = None
+        refer_bbox = refer_bbox.sigmoid()
+        for i, layer in enumerate(self.layers):
+            output = layer(output, refer_bbox, feats, shapes, padding_mask, attn_mask, pos_mlp(refer_bbox))
+
+            bbox = bbox_head[i](output)
+            refined_bbox = torch.sigmoid(bbox + inverse_sigmoid(refer_bbox))
+
+            if self.training:
+                dec_cls.append(score_head[i](output))
+                if i == 0:
+                    dec_bboxes.append(refined_bbox)
+                else:
+                    dec_bboxes.append(torch.sigmoid(bbox + inverse_sigmoid(last_refined_bbox)))
+            elif i == self.eval_idx:
+                dec_cls.append(score_head[i](output))
+                dec_bboxes.append(refined_bbox)
+                break
+
+            last_refined_bbox = refined_bbox
+            refer_bbox = refined_bbox.detach() if self.training else refined_bbox
+
+        return torch.stack(dec_bboxes), torch.stack(dec_cls)

+ 84 - 0
ultralytics/nn/modules/utils.py

@@ -0,0 +1,84 @@
+# Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
+"""Module utils."""
+
+import copy
+import math
+
+import numpy as np
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+from torch.nn.init import uniform_
+
+__all__ = "multi_scale_deformable_attn_pytorch", "inverse_sigmoid"
+
+
+def _get_clones(module, n):
+    """Create a list of cloned modules from the given module."""
+    return nn.ModuleList([copy.deepcopy(module) for _ in range(n)])
+
+
+def bias_init_with_prob(prior_prob=0.01):
+    """Initialize conv/fc bias value according to a given probability value."""
+    return float(-np.log((1 - prior_prob) / prior_prob))  # return bias_init
+
+
+def linear_init(module):
+    """Initialize the weights and biases of a linear module."""
+    bound = 1 / math.sqrt(module.weight.shape[0])
+    uniform_(module.weight, -bound, bound)
+    if hasattr(module, "bias") and module.bias is not None:
+        uniform_(module.bias, -bound, bound)
+
+
+def inverse_sigmoid(x, eps=1e-5):
+    """Calculate the inverse sigmoid function for a tensor."""
+    x = x.clamp(min=0, max=1)
+    x1 = x.clamp(min=eps)
+    x2 = (1 - x).clamp(min=eps)
+    return torch.log(x1 / x2)
+
+
+def multi_scale_deformable_attn_pytorch(
+    value: torch.Tensor,
+    value_spatial_shapes: torch.Tensor,
+    sampling_locations: torch.Tensor,
+    attention_weights: torch.Tensor,
+) -> torch.Tensor:
+    """
+    Multiscale deformable attention.
+
+    https://github.com/IDEA-Research/detrex/blob/main/detrex/layers/multi_scale_deform_attn.py
+    """
+    bs, _, num_heads, embed_dims = value.shape
+    _, num_queries, num_heads, num_levels, num_points, _ = sampling_locations.shape
+    value_list = value.split([H_ * W_ for H_, W_ in value_spatial_shapes], dim=1)
+    sampling_grids = 2 * sampling_locations - 1
+    sampling_value_list = []
+    for level, (H_, W_) in enumerate(value_spatial_shapes):
+        # bs, H_*W_, num_heads, embed_dims ->
+        # bs, H_*W_, num_heads*embed_dims ->
+        # bs, num_heads*embed_dims, H_*W_ ->
+        # bs*num_heads, embed_dims, H_, W_
+        value_l_ = value_list[level].flatten(2).transpose(1, 2).reshape(bs * num_heads, embed_dims, H_, W_)
+        # bs, num_queries, num_heads, num_points, 2 ->
+        # bs, num_heads, num_queries, num_points, 2 ->
+        # bs*num_heads, num_queries, num_points, 2
+        sampling_grid_l_ = sampling_grids[:, :, :, level].transpose(1, 2).flatten(0, 1)
+        # bs*num_heads, embed_dims, num_queries, num_points
+        sampling_value_l_ = F.grid_sample(
+            value_l_, sampling_grid_l_, mode="bilinear", padding_mode="zeros", align_corners=False
+        )
+        sampling_value_list.append(sampling_value_l_)
+    # (bs, num_queries, num_heads, num_levels, num_points) ->
+    # (bs, num_heads, num_queries, num_levels, num_points) ->
+    # (bs, num_heads, 1, num_queries, num_levels*num_points)
+    attention_weights = attention_weights.transpose(1, 2).reshape(
+        bs * num_heads, 1, num_queries, num_levels * num_points
+    )
+    output = (
+        (torch.stack(sampling_value_list, dim=-2).flatten(-2) * attention_weights)
+        .sum(-1)
+        .view(bs, num_heads * embed_dims, num_queries)
+    )
+    return output.transpose(1, 2).contiguous()

Разница между файлами не показана из-за своего большого размера
+ 1190 - 0
ultralytics/nn/tasks.py