2024-04-21 00:02:47 +08:00
import math
import torch
import torch.nn as nn
import cv2 as cv
import numpy as np
import torchvision
from pathlib import Path
from PIL import Image
from torch import Tensor
from torch.nn import Module
from numpy import ndarray, linalg
from torchvision import transforms
from ortools.algorithms.pywrapknapsack_solver import KnapsackSolver
from config import DSNetConfig
from typing import Iterable
class FeatureExtractor(object):
def __init__(self):
super(FeatureExtractor, self).__init__()
self.preprocess = transforms.Compose([
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
self.model = torchvision.models.googlenet(pretrained=True)
self.model = torch.nn.Sequential(*list(self.model.children())[:-2])
self.model = self.model.to(DSNetConfig.device).eval()
def run(self, img: ndarray) -> ndarray:
img : Image = Image.fromarray(img)
tensor : Tensor = self.preprocess(img)
batch : Tensor = tensor.unsqueeze(0)
with torch.no_grad():
feat : Tensor|ndarray|None = None
feat = self.model(batch.to(DSNetConfig.device))
feat = feat.squeeze().cpu().numpy()
assert feat.shape == (1024,), f'Invalid feature shape {feat.shape}: expected 1024'
feat /= linalg.norm(feat) + 1e-10
return feat
class VideoPreprocessor(object):
def __init__(self) -> None:
super(VideoPreprocessor, self).__init__()
self.model = FeatureExtractor()
self.sample_rate = DSNetConfig.sample_rate
def get_features(self, video_path: str) -> tuple[int, ndarray]:
video_path = Path(video_path)
video_capture = cv.VideoCapture(str(video_path))
assert video_capture is not None, f'Cannot open video: {video_path}'
features: list[ndarray] = []
n_frames: int = 0
while True:
ret, frame = video_capture.read()
if not ret:
if n_frames % self.sample_rate == 0:
frame = cv.cvtColor(frame, cv.COLOR_BGR2RGB)
feat = self.model.run(frame)
n_frames += 1
features = np.array(features)
return n_frames, features
def calculate_scatters(K: ndarray) -> ndarray:
n = K.shape[0]
K1 = np.cumsum([0] + list(np.diag(K)))
K2 = np.zeros((n + 1, n + 1))
K2[1:, 1:] = np.cumsum(np.cumsum(K, 0), 1)
diagK2 = np.diag(K2)
i = np.arange(n).reshape((-1, 1))
j = np.arange(n).reshape((1, -1))
scatters = (
K1[1:].reshape((1, -1)) - K1[:-1].reshape((-1, 1)) -
(diagK2[1:].reshape((1, -1)) + diagK2[:-1].reshape((-1, 1)) -
K2[1:, :-1].T - K2[:-1, 1:]) /
((j - i + 1).astype(np.float32) + (j == i - 1).astype(np.float32))
scatters[j < i] = 0
return scatters
def change_point_detect_nonlin(K: ndarray, ncp: int, lmin: int = 1, lmax: int = 100000, backtrack=True) -> tuple[ndarray, ndarray]:
m = int(ncp)
n, n1 = K.shape
assert n == n1, 'Kernel matrix awaited.'
assert (m + 1) * lmin <= n <= (m + 1) * lmax
assert 1 <= lmin <= lmax
J = VideoPreprocessor.calculate_scatters(K)
I = 1e101 * np.ones((m + 1, n + 1))
I[0, lmin:lmax] = J[0, lmin - 1:lmax - 1]
p = np.zeros((m + 1, n + 1), dtype=int) if backtrack else np.zeros((1, 1), dtype=int)
for k in range(1, m + 1): # k: 当前向视频中插入了k个变化点, 即将视频分为了(k + 1)段
for l in range((k + 1) * lmin, n + 1): # l: 当序列中出现了k个变化点后, 下一个段的最小起始位置, 也即是当前段的结束位置
tmin = max(k * lmin, l - lmax) # tmin: 现有的k个变化点至少使用了k * lmin个帧, 即当前段的最小起始位置
tmax = l - lmin + 1 #
c = J[tmin:tmax, l - 1].reshape(-1) + \
I[k - 1, tmin:tmax].reshape(-1)
I[k, l] = np.min(c)
if backtrack:
p[k, l] = np.argmin(c) + tmin
cps = np.zeros(m, dtype=int)
if backtrack:
cur = n
for k in range(m, 0, -1):
cps[k - 1] = p[k, cur]
cur = cps[k - 1]
scores = I[:, n].copy()
scores[scores > 1e99] = np.inf
return cps, scores
def change_point_detect_auto(K: ndarray) -> tuple[ndarray, ndarray]:
m, N = len(K) - 1, len(K)
_, scores = VideoPreprocessor.change_point_detect_nonlin(K, m, backtrack=False)
penalties = np.zeros(m + 1)
ncp = np.arange(1, m + 1)
penalties[1:] = (ncp / (2.0 * N)) * (np.log(float(N) / ncp) + 1)
costs = scores / float(N) + penalties
m_best = np.argmin(costs)
return VideoPreprocessor.change_point_detect_nonlin(K, m_best)
def kernel_temporal_segment(self, n_frames: int, features: ndarray) -> tuple[ndarray, ndarray, ndarray]:
seq_len = len(features)
picks = np.arange(0, seq_len) * self.sample_rate
kernel = np.matmul(features, features.T)
change_points, _ = VideoPreprocessor.change_point_detect_auto(kernel)
change_points *= self.sample_rate
change_points = np.hstack((0, change_points, n_frames))
begin_frames = change_points[:-1]
end_frames = change_points[1:]
change_points = np.vstack((begin_frames, end_frames - 1)).T
n_frame_per_seg = end_frames - begin_frames
return change_points, n_frame_per_seg, picks
def run(self, video_path: str) -> tuple[int, ndarray, ndarray, ndarray, ndarray]:
n_frames, features = self.get_features(video_path)
cps, nfps, picks = self.kernel_temporal_segment(n_frames, features)
return n_frames, features, cps, nfps, picks
class ScaledDotProductAttention(Module):
def __init__(self, d_k: float):
super(ScaledDotProductAttention, self).__init__()
self.dropout = nn.Dropout(0.5)
self.sqrt_d_k = math.sqrt(d_k)
def forward(self, Q: Tensor, K: Tensor, V: Tensor) -> tuple[Tensor, Tensor]:
attn = torch.bmm(Q, K.transpose(2, 1))
attn = attn / self.sqrt_d_k
attn = torch.softmax(attn, dim=-1)
attn = self.dropout(attn)
y = torch.bmm(attn, V)
return y, attn
class MultiHeadAttention(nn.Module):
def __init__(self, num_head: int, num_feature: int) -> None:
super(MultiHeadAttention, self).__init__()
self.num_head = num_head
self.Q = nn.Linear(num_feature, num_feature, bias=False)
self.K = nn.Linear(num_feature, num_feature, bias=False)
self.V = nn.Linear(num_feature, num_feature, bias=False)
self.d_k = num_feature // num_head
self.attention = ScaledDotProductAttention(self.d_k)
self.fc = nn.Sequential(
nn.Linear(num_feature, num_feature, bias=False),
def forward(self, x: Tensor) -> tuple[Tensor, Tensor]:
_, seq_len, num_feature = x.shape # [1, seq_len, 1024]
K: Tensor = self.K(x) # [1, seq_len, 1024]
Q: Tensor = self.Q(x) # [1, seq_len, 1024]
V: Tensor = self.V(x) # [1, seq_len, 1024]
K = K.view(1, seq_len, self.num_head, self.d_k).permute(
2, 0, 1, 3).contiguous().view(self.num_head, seq_len, self.d_k)
Q = Q.view(1, seq_len, self.num_head, self.d_k).permute(
2, 0, 1, 3).contiguous().view(self.num_head, seq_len, self.d_k)
V = V.view(1, seq_len, self.num_head, self.d_k).permute(
2, 0, 1, 3).contiguous().view(self.num_head, seq_len, self.d_k)
y, attn = self.attention(Q, K, V) # [num_head, seq_len, d_k]
y = y.view(1, self.num_head, seq_len, self.d_k).permute(
0, 2, 1, 3).contiguous().view(1, seq_len, num_feature)
y = self.fc(y)
return y, attn
class AttentionExtractor(MultiHeadAttention):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def forward(self, *inputs):
out, _ = super().forward(*inputs)
return out
class DSNetAF(Module):
def __init__(self, num_feature: int, num_hidden: int, num_head: int) -> None:
super(DSNetAF, self).__init__()
self.base_model = AttentionExtractor(num_head, num_feature)
self.layer_norm = nn.LayerNorm(num_feature)
self.fc1 = nn.Sequential(
nn.Linear(num_feature, num_hidden),
self.fc_cls = nn.Linear(num_hidden, 1)
self.fc_loc = nn.Linear(num_hidden, 2)
self.fc_ctr = nn.Linear(num_hidden, 1)
def offset2bbox(offsets: np.ndarray) -> np.ndarray:
offset_left, offset_right = offsets[:, 0], offsets[:, 1]
seq_len, _ = offsets.shape
indices = np.arange(seq_len)
bbox_left = indices - offset_left
bbox_right = indices + offset_right + 1
bboxes = np.vstack((bbox_left, bbox_right)).T
return bboxes
def forward(self, x: Tensor) -> tuple[Tensor, Tensor, Tensor]:
_, seq_len, _ = x.shape
out = self.base_model(x)
out = out + x
out = self.layer_norm(out)
out = self.fc1(out)
pred_cls = self.fc_cls(out).sigmoid().view(seq_len)
pred_loc = self.fc_loc(out).exp().view(seq_len, 2)
pred_ctr = self.fc_ctr(out).sigmoid().view(seq_len)
return pred_cls, pred_loc, pred_ctr
def predict(self, seq: Tensor) -> tuple[ndarray, ndarray]:
pred_cls, pred_loc, pred_ctr = self(seq)
pred_cls *= pred_ctr
pred_cls /= pred_cls.max() + 1e-8
pred_cls = pred_cls.cpu().numpy()
pred_loc = pred_loc.cpu().numpy()
pred_bboxes = DSNetAF.offset2bbox(pred_loc)
return pred_cls, pred_bboxes
def iou_lr(anchor_bbox: np.ndarray, target_bbox: np.ndarray) -> np.ndarray:
anchor_left, anchor_right = anchor_bbox[:, 0], anchor_bbox[:, 1]
target_left, target_right = target_bbox[:, 0], target_bbox[:, 1]
inter_left = np.maximum(anchor_left, target_left)
inter_right = np.minimum(anchor_right, target_right)
union_left = np.minimum(anchor_left, target_left)
union_right = np.maximum(anchor_right, target_right)
intersect = inter_right - inter_left
intersect[intersect < 0] = 0
union = union_right - union_left
union[union <= 0] = 1e-6
iou = intersect / union
return iou
def nms(scores: np.ndarray, bboxes: np.ndarray, thresh: float) -> tuple[np.ndarray, np.ndarray]:
valid_idx = bboxes[:, 0] < bboxes[:, 1]
scores = scores[valid_idx]
bboxes = bboxes[valid_idx]
arg_desc = scores.argsort()[::-1]
scores_remain = scores[arg_desc]
bboxes_remain = bboxes[arg_desc]
keep_bboxes = []
keep_scores = []
while bboxes_remain.size > 0:
bbox = bboxes_remain[0]
score = scores_remain[0]
iou = iou_lr(bboxes_remain, np.expand_dims(bbox, axis=0))
keep_indices = (iou < thresh)
bboxes_remain = bboxes_remain[keep_indices]
scores_remain = scores_remain[keep_indices]
keep_bboxes = np.asarray(keep_bboxes, dtype=bboxes.dtype)
keep_scores = np.asarray(keep_scores, dtype=scores.dtype)
return keep_scores, keep_bboxes
def knapsack(values: Iterable[int],
weights: Iterable[int],
capacity: int
) -> list[int]:
knapsack_solver = KnapsackSolver(
values = list(values)
weights = list(weights)
capacity = int(capacity)
knapsack_solver.Init(values, [weights], [capacity])
packed_items = [x for x in range(0, len(weights))
if knapsack_solver.BestSolutionContains(x)]
return packed_items
def get_keyshot_summ(pred: np.ndarray,
cps: np.ndarray,
n_frames: int,
nfps: np.ndarray,
picks: np.ndarray,
proportion: float = 0.15
) -> np.ndarray:
assert pred.shape == picks.shape
picks = np.asarray(picks, dtype=np.int32)
# Get original frame scores from downsampled sequence
frame_scores = np.zeros(n_frames, dtype=np.float32)
for i in range(len(picks)):
pos_lo = picks[i]
pos_hi = picks[i + 1] if i + 1 < len(picks) else n_frames
frame_scores[pos_lo:pos_hi] = pred[i]
# Assign scores to video shots as the average of the frames.
seg_scores = np.zeros(len(cps), dtype=np.int32)
for seg_idx, (first, last) in enumerate(cps):
scores = frame_scores[first:last + 1]
seg_scores[seg_idx] = int(1000 * scores.mean())
# Apply knapsack algorithm to find the best shots
limits = int(n_frames * proportion)
packed = knapsack(seg_scores, nfps, limits)
# Get key-shot based summary
summary = np.zeros(n_frames, dtype=np.bool_)
for seg_idx in packed:
first, last = cps[seg_idx]
summary[first:last + 1] = True
return summary
def bbox2summary(seq_len: int,
pred_cls: np.ndarray,
pred_bboxes: np.ndarray,
change_points: np.ndarray,
n_frames: int,
nfps: np.ndarray,
picks: np.ndarray
) -> np.ndarray:
score = np.zeros(seq_len, dtype=np.float32)
for bbox_idx in range(len(pred_bboxes)):
lo, hi = pred_bboxes[bbox_idx, 0], pred_bboxes[bbox_idx, 1]
score[lo:hi] = np.maximum(score[lo:hi], [pred_cls[bbox_idx]])
pred_summ = get_keyshot_summ(score, change_points, n_frames, nfps, picks)
return pred_summ