#!/usr/bin/env python3
from __future__ import annotations

import argparse
import base64
import csv
import hashlib
import json
import os
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature


DEFAULT_CSV_PATH = Path("bilibili_latest_videos_259655230.csv")
DEFAULT_TOKENS_PATH = Path("apns_device_tokens.txt")
DEFAULT_STATE_PATH = Path(".apns_csv_monitor_state.json")
DEFAULT_INTERVAL_SECONDS = 10


@dataclass(slots=True)
class APNsConfig:
    team_id: str
    key_id: str
    bundle_id: str
    auth_key_path: Path
    environment: str

    @property
    def host(self) -> str:
        if self.environment == "production":
            return "https://api.push.apple.com"
        return "https://api.sandbox.push.apple.com"


def base64url(data: bytes) -> str:
    return base64.urlsafe_b64encode(data).rstrip(b"=").decode("ascii")


def build_jwt(config: APNsConfig) -> str:
    private_key = serialization.load_pem_private_key(
        config.auth_key_path.read_bytes(),
        password=None,
    )
    if not isinstance(private_key, ec.EllipticCurvePrivateKey):
        raise TypeError("APNs auth key must be an EC private key")

    header = {"alg": "ES256", "kid": config.key_id}
    claims = {"iss": config.team_id, "iat": int(time.time())}
    signing_input = (
        f"{base64url(json.dumps(header, separators=(',', ':')).encode())}."
        f"{base64url(json.dumps(claims, separators=(',', ':')).encode())}"
    )
    der_signature = private_key.sign(signing_input.encode("ascii"), ec.ECDSA(hashes.SHA256()))
    r, s = decode_dss_signature(der_signature)
    raw_signature = r.to_bytes(32, "big") + s.to_bytes(32, "big")
    return f"{signing_input}.{base64url(raw_signature)}"


def read_tokens(path: Path) -> list[str]:
    if not path.exists():
        return []
    tokens: list[str] = []
    for line in path.read_text(encoding="utf-8").splitlines():
        token = line.strip()
        if token and not token.startswith("#"):
            tokens.append(token)
    return tokens


def csv_hash(csv_path: Path) -> str:
    return hashlib.sha256(csv_path.read_bytes()).hexdigest()


def latest_video_payload(csv_path: Path) -> tuple[str, str]:
    with csv_path.open("r", encoding="utf-8-sig", newline="") as file:
        reader = csv.DictReader(file)
        rows = list(reader)
    if not rows:
        return "陕西联合 CSV 已更新", "CSV 内容发生变化。"

    latest = rows[-1]
    title = latest.get("title", "").strip() or "陕西联合有新内容"
    publish_time = latest.get("publish_time", "").strip()
    body = f"{title}\n发布时间：{publish_time}" if publish_time else title
    return "陕西联合有新内容", body


def load_state(path: Path) -> dict[str, Any]:
    if not path.exists():
        return {}
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}


def save_state(path: Path, state: dict[str, Any]) -> None:
    path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8")


def send_apns_notification(
    config: APNsConfig,
    jwt: str,
    device_token: str,
    title: str,
    body: str,
) -> None:
    payload = {
        "aps": {
            "alert": {
                "title": title,
                "body": body,
            },
            "sound": "default",
        }
    }
    url = f"{config.host}/3/device/{device_token}"
    command = [
        "curl",
        "--http2",
        "-sS",
        "-X",
        "POST",
        url,
        "-H",
        f"authorization: bearer {jwt}",
        "-H",
        f"apns-topic: {config.bundle_id}",
        "-H",
        "apns-push-type: alert",
        "-H",
        "apns-priority: 10",
        "-H",
        "content-type: application/json",
        "-d",
        json.dumps(payload, ensure_ascii=False, separators=(",", ":")),
        "-w",
        "\n%{http_code}",
    ]
    result = subprocess.run(command, check=False, capture_output=True, text=True)
    output = result.stdout.strip()
    status = output.splitlines()[-1] if output else "000"
    response_body = "\n".join(output.splitlines()[:-1])
    if result.returncode != 0 or not status.startswith("2"):
        raise RuntimeError(
            f"APNs failed for token {device_token[:12]}... status={status} "
            f"stdout={response_body} stderr={result.stderr.strip()}"
        )


def build_config_from_env() -> APNsConfig:
    missing = [
        name
        for name in ("APNS_TEAM_ID", "APNS_KEY_ID", "APNS_BUNDLE_ID", "APNS_AUTH_KEY_PATH")
        if not os.environ.get(name)
    ]
    if missing:
        raise SystemExit(f"缺少环境变量：{', '.join(missing)}")

    return APNsConfig(
        team_id=os.environ["APNS_TEAM_ID"],
        key_id=os.environ["APNS_KEY_ID"],
        bundle_id=os.environ["APNS_BUNDLE_ID"],
        auth_key_path=Path(os.environ["APNS_AUTH_KEY_PATH"]).expanduser(),
        environment=os.environ.get("APNS_ENVIRONMENT", "development"),
    )


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="监控 CSV 文件变更，并通过 APNs 推送到 iPhone")
    parser.add_argument("--csv", type=Path, default=DEFAULT_CSV_PATH, help="要监控的 CSV 文件")
    parser.add_argument("--tokens", type=Path, default=DEFAULT_TOKENS_PATH, help="APNs device token 列表")
    parser.add_argument("--state", type=Path, default=DEFAULT_STATE_PATH, help="存储上次 CSV 哈希的状态文件")
    parser.add_argument("--interval", type=int, default=DEFAULT_INTERVAL_SECONDS, help="检查间隔秒数")
    parser.add_argument("--once", action="store_true", help="只检查一次")
    return parser


def run_once(args: argparse.Namespace, config: APNsConfig) -> None:
    csv_path = args.csv.expanduser().resolve()
    if not csv_path.exists():
        raise FileNotFoundError(csv_path)

    current_hash = csv_hash(csv_path)
    state = load_state(args.state)
    previous_hash = state.get("csv_hash")
    if previous_hash == current_hash:
        print(f"{datetime.now().isoformat(timespec='seconds')} CSV 未变化")
        return

    if previous_hash is None:
        state["csv_hash"] = current_hash
        state["updated_at"] = datetime.now(timezone.utc).isoformat()
        save_state(args.state, state)
        print("首次运行，已建立 CSV 基线，不发送通知")
        return

    tokens = read_tokens(args.tokens)
    if not tokens:
        print(f"CSV 已变化，但没有 device token。请把 app 中复制的 token 放入 {args.tokens}")
        return

    title, body = latest_video_payload(csv_path)
    jwt = build_jwt(config)
    for token in tokens:
        send_apns_notification(config, jwt, token, title, body)
    state["csv_hash"] = current_hash
    state["updated_at"] = datetime.now(timezone.utc).isoformat()
    save_state(args.state, state)
    print(f"CSV 已变化，已推送 {len(tokens)} 台设备")


def main() -> int:
    args = build_parser().parse_args()
    config = build_config_from_env()

    while True:
        try:
            run_once(args, config)
        except Exception as exc:
            print(f"错误：{exc}", file=sys.stderr)
        if args.once:
            return 0
        time.sleep(args.interval)


if __name__ == "__main__":
    raise SystemExit(main())
