之前在搭建navidrome的时候用了一套半自动下载HIFINI百度云音乐的方法,整体操作还是稍难;这次借助ChatGPT,做了一套基本全自动的下载方案,只需要输入链接和密码,直接将音质最优的音乐下载到本地;

前端效果如下:
网页效果.png
后端执行进行处理:
后端效果.png
后端效果2.png


说一下整体逻辑:

1.前端收到链接和提取码,反馈给后端,后端解析
2.使用BaiduPCS的linux版本对解析的结果进行访问(BaiduPCS可以在linux上进行转存和下载,对于音乐库很合适)
3.对共享文件进行递归,可能找到多个音质的文件,只下载音质最好的FLAC>WAV>MP3
4.对转存后的文件下载到目标文件夹


接下来分步骤:

1.在github下载BaiduPCS Github链接,在release中找到对应的最新版本,linux还是windows
2.创建python脚本
第一个脚本是一个专门针对BaiduPCS的类:

import numpy
import os
import pandas
import subprocess
from pathlib import Path
import re
from datetime import datetime

class BaiduPCS:
    PCS_BIN = Path("/opt/BaiduPCS-Go/BaiduPCS-Go")
    COOKIE_FULL = Path("/mnt/data/cookies/baiduyun.txt")
    COOKIE_SIMPLE = Path("/mnt/data/cookies/baidu.txt")

    def __init__(self, user_id):
        self.user_id = user_id
        self.user_dict = None

    def _exec(self, command_list, stdinput=''):
        if isinstance(command_list, str):
            raise ValueError("馃毇 涓嶅厑璁镐紶瀛楃涓插懡浠わ紒蹇呴』鏄?list[str]锛?)

        print("馃殌 鎵ц鍛戒护:", " ".join(command_list))

        if stdinput:
            process = subprocess.Popen(command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            stdout, stderr = process.communicate(input=stdinput)
        else:
            result = subprocess.run(command_list, capture_output=True, text=True)
            stdout, stderr = result.stdout, result.stderr

        if stderr:
            print("馃毃 STDERR:\n", stderr)
        return stdout

    def _parse_kv_line(self, text):
        matches = re.findall(r'(\S+):\s*([^,]+)', text.replace("褰撳墠甯愬彿 ", ""))
        user_info = {}
        for k, v in matches:
            if v.isdigit():
                user_info[k] = int(v)
            else:
                try: user_info[k] = float(v)
                except: user_info[k] = v.strip()
        return user_info

    def _parse_ls_output(self, raw_text):
        lines = raw_text.strip().splitlines()
        entries = []
        for line in lines:
            line = line.strip()
            if not re.match(r"^\d+\s", line):
                continue
            match = re.match(r"(\d+)\s+(\S+)\s+([\d\-]+\s[\d:]+)\s+(.+)", line)
            if match:
                idx, size, date_str, name = match.groups()
                try:
                    date = datetime.strptime(date_str.strip(), "%Y-%m-%d %H:%M:%S")
                except:
                    date = date_str.strip()
                entries.append({
                    "index": int(idx),
                    "size": size,
                    "modified": date,
                    "name": name.strip(),
                    "type": "dir" if name.strip().endswith("/") else "file"
                })
        return entries

    def _extract_cookie(self, key):
        with self.COOKIE_FULL.open() as f:
            for line in f:
                parts = line.strip().split('\t')
                if len(parts) >= 7 and parts[5] == key:
                    return parts[6]
        return None

    def login(self):
        user = self.get_userinfo()
        if user.get("鐢ㄦ埛鍚?) == self.user_id:
            print(f"鉁?宸茬櫥褰曡处鍙凤細{self.user_id}")
            return
        self.logout()

        bduss = self._extract_cookie("BDUSS")
        stoken = self._extract_cookie("STOKEN")

        if not bduss:
            raise RuntimeError("鉂?鏃犳硶鎻愬彇 BDUSS锛宑ookie 鏂囦欢鍑洪敊")

        cookie_string = f"BDUSS={bduss};"
        if stoken:
            cookie_string += f" STOKEN={stoken};"

        self.COOKIE_SIMPLE.write_text(cookie_string)
        print("馃攽 鐧诲綍涓?..")
        self._exec([str(self.PCS_BIN), "login", f"--cookies={cookie_string}"])
        self.user_dict = self.get_userinfo()

    def logout(self):
        print("馃敀 姝e湪鐧诲嚭...")
        self._exec([str(self.PCS_BIN), "logout"], stdinput="y\n")

    def get_userinfo(self):
        result = self._exec([str(self.PCS_BIN), "who"])
        return self._parse_kv_line(result)

    def list_dir(self, path="/"):
        path = path.strip().strip('"').rstrip("/")
        if not path.startswith("/"):
            path = "/" + path
        raw = self._exec([str(self.PCS_BIN), "ls", path])
        print(raw)
        return self._parse_ls_output(raw)

    def download(self, remote_path, local_dir="."):
        Path(local_dir).mkdir(parents=True, exist_ok=True)
        filename = Path(remote_path).name
        local_file = Path(local_dir) / filename

        print(f"猬囷笍 涓嬭浇涓細{remote_path} 鈫?{local_dir}")
        cmd = [
            str(self.PCS_BIN),
            "download",
            remote_path,
            "-saveto", str(local_dir),
            "-overwrite"
        ]
        output = self._exec(cmd)
        print(f"馃摝 涓嬭浇杈撳嚭:\n{output}")

        if local_file.exists():
            print(f"鉁?宸蹭笅杞? {local_file} ({local_file.stat().st_size} bytes)")
        else:
            print(f"鉂?涓嬭浇澶辫触: {local_file} 涓嶅瓨鍦?)

    def download_recursive(self, path="/share_tmp", out_dir="./downloads"):
        print(f"馃搨 璁块棶鐩綍: {path}")
        items = self.list_dir(path)
        for item in items:
            remote_full = f"{path.rstrip('/')}/{item['name'].strip('/')}"
            local_full = Path(out_dir) / Path(remote_full.strip("/"))
            if item["type"] == "dir":
                self.download_recursive(remote_full, local_full)
            else:
                self.download(remote_full, local_full.parent)

这个类里面,主要是针对BaiduPCS的指令包装,能够在python里使用;
这里需要注意的是需要获取到百度云的Cookie来进行登录,或者你先在服务器上登录好账号,就不需要通过代码登录了;

第二段是主函数:

# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, render_template
import uuid
from pathlib import Path
import shutil
import re
from baidupcs_client import BaiduPCS  # 假设你已经把之前的 BaiduPCS 类提成模块了
import time

app = Flask(__name__, template_folder="templates")

@app.route("/", methods=["GET"])
def index():
    return render_template("index.html")

@app.route("/submit", methods=["POST"])
def handle_submission():
    data = request.form
    share_link = data.get("link")
    password = data.get("password")
    file_type = data.get("type", "music")

    if not share_link or not password:
        return jsonify({"error": "缺少分享链接或密码"}), 400

    temp_id = uuid.uuid4().hex[:8]
    temp_dir = Path(f"./temp_{temp_id}")
    temp_dir.mkdir(parents=True, exist_ok=True)
    print(f"[INFO] 本地临时目录: {temp_dir}")
    temp_dir = Path("/mnt/data/music/")

    pcs = BaiduPCS("overstic")
    print("[INFO] 登录百度网盘账户...")
    pcs.login()
    try:
        login_output = pcs.get_userinfo()
        print(login_output)
        print(f"[DEBUG] login_output (get_userinfo): {login_output}")
        if not login_output:
            raise Exception("登录返回为空(get_userinfo() 没有返回用户信息)")
        if not isinstance(login_output, dict):
            raise Exception(f"登录返回格式错误,应为 dict,实际类型: {type(login_output)}")
        if "用户名" not in login_output:
            raise Exception(f"登录返回中未包含用户名字段: {login_output}")
        if login_output.get("用户名") != "overstic":
            raise Exception(f"登录失败或用户不匹配,返回用户名: {login_output.get('用户名')}")
    except Exception as e:
        raise Exception(f"[ERROR] 登录异常: {e}")
    
    time.sleep(1)
    
    print("[INFO] 检查并删除已有的 /share_tmp")
    dir_check = pcs._exec([str(pcs.PCS_BIN), "ls", "/"])
    time.sleep(1)
    print(f"[DEBUG] 根目录 ls 输出: {dir_check}")
    if "/share_tmp" in dir_check or "share_tmp/" in dir_check:
        print("[DEBUG] 检测到 share_tmp 存在,执行删除")
        del_output = pcs._exec([str(pcs.PCS_BIN), "rm", "/share_tmp"])
        print(f"[DEBUG] 删除输出: {del_output}")
        if "错误" in del_output or "失败" in del_output:
            # 检查是否仍存在
            double_check = pcs._exec([str(pcs.PCS_BIN), "ls", "/"])
            if "/share_tmp" in double_check or "share_tmp/" in double_check:
                raise Exception("无法删除旧的 /share_tmp")
            else:
                print("[WARN] 删除返回异常,但确认目录已不在,继续...")

        print("[DEBUG] 尝试清理回收站中残留的 /share_tmp")
        pcs._exec([str(pcs.PCS_BIN), "recycle", "delete"])
    time.sleep(1)

    print("[INFO] 创建新的 /share_tmp")
    mk_output = pcs._exec([str(pcs.PCS_BIN), "mkdir", "/share_tmp"])
    print(f"[DEBUG] mkdir 输出: {mk_output}")
    if "文件已存在" in mk_output:
        print("[WARN] share_tmp 已存在,跳过创建")
    elif "错误" in mk_output or "失败" in mk_output or "EOF" in mk_output:
        raise Exception("创建 /share_tmp 失败,可能是网络问题或 API 限制")
    time.sleep(1)
    print("[INFO] 准备转存分享链接...")
    pcs._exec([str(pcs.PCS_BIN), "reset"])
    pcs._exec([str(pcs.PCS_BIN), "set", "enable_https=false"])
    pcs._exec([str(pcs.PCS_BIN), "cd", "/share_tmp"])

    match = re.match(r"(https?://pan\\.baidu\\.com/s/[\w-]+)(?:\\?pwd=([\w]+))?", share_link.strip())
    if match:
        share_url = match.group(1)
        extracted_pwd = match.group(2) if match.group(2) else password.strip()
    else:
        share_url = share_link.strip()
        extracted_pwd = password.strip()

    transfer_cmd = [str(pcs.PCS_BIN), "transfer", share_url]
    if extracted_pwd:
        transfer_cmd.append(extracted_pwd)

    print(f"[DEBUG] 执行转存命令: {' '.join(transfer_cmd)}")
    transfer_output = pcs._exec(transfer_cmd)
    print("[INFO] 转存输出:")
    print(transfer_output)

    if ("提取码错误" in transfer_output
        or "链接不存在" in transfer_output
        or "请输入提取码" in transfer_output
        or "转存到网盘失败" in transfer_output
        or "链接地址或提取码非法" in transfer_output
        or "获取分享项元数据错误" in transfer_output
        or "转存失败" in transfer_output):
        raise Exception(f"转存失败,请检查链接或密码。转存输出: {transfer_output}")

    share_root = "/share_tmp"
    time.sleep(1)
    if file_type == "music":
        print("[INFO] 扫描音乐目录...")
        music_items = find_music_tracks(pcs, share_root)
        print(f"[INFO] 找到音频文件: {music_items}")
        if not music_items:
            return jsonify({"error": "分享链接中未找到音乐文件"}), 404
        for remote_path in music_items:
            print(f"[INFO] 开始下载: {remote_path}")
            dl_output = pcs.download(remote_path, temp_dir)

    time.sleep(1)
    try:
        print("[INFO] 清理 /share_tmp")
        rm_output = pcs._exec([str(pcs.PCS_BIN), "rm", "/share_tmp"])
        print(f"[DEBUG] 删除输出: {rm_output}")
        if "失败" in rm_output or "错误" in rm_output:
            raise Exception("清理 /share_tmp 失败")
    except Exception as e:
        print(f"[ERROR] 删除失败: {e}")

    return jsonify({"status": "下载任务完成,已清理百度云目录", "path": str(temp_dir)})

def find_music_tracks(pcs, path):
    priority = [".flac", ".ape", ".wav", ".mp3", ".m4a"]
    music_files = []
    all_items = pcs.list_dir(path)

    subdirs = [item for item in all_items if item['type'] == 'dir']
    files = [item for item in all_items if item['type'] == 'file']

    grouped = {}
    for file in files:
        base = Path(file['name']).stem
        ext = Path(file['name']).suffix.lower()
        grouped.setdefault(base, []).append((ext, file))

    for versions in grouped.values():
        versions.sort(key=lambda x: priority.index(x[0]) if x[0] in priority else 999)
        best = versions[0][1]
        music_files.append(f"{path.rstrip('/')}/{best['name']}")

    for sub in subdirs:
        music_files.extend(find_music_tracks(pcs, f"{path.rstrip('/')}/{sub['name'].strip('/')}") )

    return music_files

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

主要用来支持前端输入,后端处理,会调用第一个py文件;
注意代码里

pcs = BaiduPCS("overstic")

对应的是我的账号名,改成自己的账号名称即可

标签: none

添加新评论