377 lines
19 KiB
Python
377 lines
19 KiB
Python
import os
|
||
import sys
|
||
import requests
|
||
import time
|
||
from random import randint
|
||
import json
|
||
import logging
|
||
from rich.logging import RichHandler
|
||
|
||
# from rich.traceback import install
|
||
# install(show_locals=True)
|
||
|
||
from rich.progress import (
|
||
SpinnerColumn,
|
||
BarColumn,
|
||
DownloadColumn,
|
||
Progress,
|
||
TaskID,
|
||
TextColumn
|
||
)
|
||
|
||
|
||
FORMAT = "%(message)s"
|
||
logging.basicConfig(
|
||
level=logging.INFO, format=FORMAT, datefmt=None, handlers=[RichHandler(show_time=False, keywords=[''], markup=True)]
|
||
)
|
||
log = logging.getLogger("rich")
|
||
|
||
|
||
class Pixiv():
|
||
def __init__(self, cookie: str = None, header: dict = None, proxies: dict = None, data_path: str = 'data', img_path: str = 'img', overwrite: bool = False, mode: list or str = "origin", retrycount: int = 10) -> None:
|
||
'''
|
||
### 初始化
|
||
初始化cookie,header,代理,数据文件路径(data_path),图片保存路径(img_path)\n
|
||
`overwrite` True 当数据文件在datapath存在时,覆盖保存;False 不覆盖保存\n
|
||
`mode` 下载模式 传入`"full"`则下载全部,传入`"规格"`即下载对应规格。也可传入列表自定义选择要下载的项 ["original","regular","small","thumb_mini"]。默认为"original"仅下载原图\n
|
||
`retrycount` 出错后的自动重试次数
|
||
'''
|
||
|
||
# 初始化各类类变量
|
||
if header == None:
|
||
header = {
|
||
"content-type": "application/json",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-origin",
|
||
"content-type": "charset=UTF-8",
|
||
}
|
||
if cookie != None:
|
||
header.update({'cookie': cookie})
|
||
self.cookie = cookie
|
||
if proxies == None:
|
||
proxies = {}
|
||
if type(mode) == str:
|
||
mode = mode.lower()
|
||
elif type(mode) == list:
|
||
mode_lower = []
|
||
for i in mode:
|
||
mode_lower.append(i.lower())
|
||
mode = mode_lower
|
||
if type(mode) == str and mode == 'full':
|
||
self.mode = ["original", "regular", "small", "thumb_mini"]
|
||
elif type(mode) == str and mode != 'full':
|
||
self.mode = [mode]
|
||
elif type(mode) == list:
|
||
self.mode = mode
|
||
|
||
self.header = header # 设置头信息(包含cookie,如果有的话)
|
||
self.proxies = proxies # 设置代理信息
|
||
|
||
self.data_path = data_path # 保存data信息的路径
|
||
self.img_path = img_path # 保存图片路径
|
||
self.overwrite = overwrite # 覆盖保存设置
|
||
self.retrycount = retrycount
|
||
|
||
def get_img_url(self, illust_id: int) -> bool or dict:
|
||
'''
|
||
### 根据Pixiv的id获取图片链接
|
||
正常查询返回dict数据,若出现错误则返回False
|
||
'''
|
||
illust_id = str(illust_id)
|
||
log.info(f"搜索插画[{illust_id}]")
|
||
origin_url = "https://www.pixiv.net/ajax/illust/{}/pages"
|
||
|
||
for i in range(0, self.retrycount): # 尝试
|
||
status = None
|
||
try:
|
||
if i > 0:
|
||
log.warning(f"第{i}次尝试搜索插画[{illust_id}]")
|
||
response = requests.get(url=origin_url.format(
|
||
illust_id), headers=self.header, proxies=self.proxies) # 发送get请求
|
||
response = response.json() # 将返回的json数据格式化
|
||
log.info(f"成功搜索到插画[{illust_id}]")
|
||
break # 成功后退出循环
|
||
except Exception as search_error:
|
||
log.error(f"获取插画[{illust_id}]链接失败\n错误信息:{search_error}")
|
||
status = False
|
||
if i >= (self.retrycount-1) and status == False:
|
||
return False
|
||
|
||
if response['error']:
|
||
log.error(f"获取插画[{illust_id}]链接出错。{response['message']}")
|
||
return False # 出错返回False
|
||
else:
|
||
# log.debug(response)
|
||
# self.__save_data(data=response,filename=illust_id)
|
||
return response['body'] # 返回主体链接部分
|
||
|
||
def get_metadata(self, illust_id: int) -> dict:
|
||
'''
|
||
### 获取插画元数据
|
||
'''
|
||
log.info(f"获取插画[{illust_id}]元数据")
|
||
header = {
|
||
"content-type": "application/json",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
|
||
"accept-language": "zh-CN,zh;q=0.9",
|
||
"sec-fetch-dest": "empty",
|
||
"sec-fetch-mode": "cors",
|
||
"sec-fetch-site": "same-origin",
|
||
"content-encoding": "gzip",
|
||
"content-type": "charset=UTF-8"}
|
||
|
||
url = "https://www.pixiv.net/ajax/illust/{}"
|
||
for i in range(0, self.retrycount):
|
||
try:
|
||
if i > 0:
|
||
log.warning(f"第{i}尝试获取插画[{illust_id}]元数据")
|
||
response = requests.get(url=url.format(
|
||
illust_id), headers=header, proxies=self.proxies)
|
||
return response.json()
|
||
except Exception as get_metadata_error:
|
||
log.error(
|
||
"[red]获取插画[[cyan]{}][/]元数据出错[/]\n错误信息:{}".format(illust_id, get_metadata_error))
|
||
|
||
def __get_img(self, illust_id: int, data: dict):
|
||
header = {
|
||
"accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
|
||
"referer": "https://www.pixiv.net/artworks/{}".format(str(illust_id)),
|
||
"Accept-Encoding": "identity",
|
||
"sec-fetch-dest": "image",
|
||
"sec-fetch-mode": "no-cors",
|
||
"sec-fetch-site": "cross-site",
|
||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
|
||
}
|
||
progress = Progress(
|
||
SpinnerColumn(),
|
||
TextColumn("{task.fields[status]}", justify="right"),
|
||
"|",
|
||
TextColumn(
|
||
"[bold blue]{task.fields[filename]}", justify="right"),
|
||
BarColumn(bar_width=None),
|
||
"[progress.percentage]{task.percentage:>3.1f}%",
|
||
"•",
|
||
DownloadColumn()
|
||
)
|
||
|
||
with progress:
|
||
for urls_data in data: # json中的body中含有一个列表,表示同时发布在一张插画页上的不同插画
|
||
# 列表包含几个字典,其中键值urls时包含了同一张图片不同规格的源地址的字典,键值为图片的规格,取出这个字典
|
||
urls = urls_data['urls']
|
||
num = 0 # 计数
|
||
|
||
for url_data in urls.items(): # 遍历该字典,一般含有thumb_mini,small,regular,original四种规格。items()函数将返回一个包含键和对应值的列表
|
||
if (url_data[0]).lower() in self.mode: # 如果该项在需要下载的规格的列表里,下载
|
||
filename = os.path.basename(url_data[1]) # 获取文件名称
|
||
|
||
for i in range(0, self.retrycount):
|
||
try:
|
||
log.debug(f"获取[{filename}]头信息")
|
||
response_header = requests.head(
|
||
url=url_data[-1], headers=header, proxies=self.proxies, timeout=5)
|
||
back_header = (response_header).headers
|
||
log.debug(f"成功获取[{filename}]头信息")
|
||
break
|
||
except Exception as get_header_error:
|
||
log.error(f"获取文件大小出错\n错误信息:{get_header_error}")
|
||
|
||
if 'Content-Length' in back_header: # 如果服务器返回文件大小
|
||
# 使用head()获取文件大小
|
||
length = back_header['Content-Length']
|
||
# 为防出错,搞个循环,错了就再来亿次
|
||
self.__checkdirs(
|
||
f"{self.img_path}{os.sep}{url_data[0]}") # 检查文件夹
|
||
|
||
response = requests.get(
|
||
url=url_data[-1], headers=header, proxies=self.proxies, stream=True, timeout=20)
|
||
|
||
for num_try in range(0, self.retrycount): # 如果出错,重试
|
||
task_2 = progress.add_task(
|
||
"下载", status="[bold yellow]下载中...[/]", filename=filename, start=False)
|
||
progress.update(
|
||
task_id=task_2, total=int(length))
|
||
|
||
try:
|
||
# 拼凑文件路径
|
||
filepath = f"{self.img_path}{os.sep}{url_data[0]}{os.sep}{filename}"
|
||
file = open(filepath, "wb")
|
||
# log.error(f"文件大小:{length}")
|
||
|
||
size = 512 # 文件碎片大小
|
||
for chunk in response.iter_content(chunk_size=size):
|
||
progress.update(task_2, advance=size)
|
||
if chunk:
|
||
file.write(chunk)
|
||
file.close()
|
||
progress.update(
|
||
task_id=task_2, visible=False) # 下载完成后隐藏进度条
|
||
progress.update(
|
||
task_id=task_2, status="[bold green]下载成功![/]", refresh=False)
|
||
log.info(
|
||
"已成功下载图片[[blue]{}[/]]\n存储目录[[blue]{}[/]]".format(filename, os.path.abspath(filepath)))
|
||
break
|
||
except Exception as get_img_error_2:
|
||
progress.remove_task(task_2)
|
||
log.error(
|
||
"[red]下载错误[/]\n错误原因:{}\n第{}次尝试下载".format(get_img_error_2, num_try))
|
||
time.sleep(randint(2, 8))
|
||
else: # 如果没有返回length
|
||
# 传统下载方法
|
||
log.warning("无法获取文件大小,使用传统方法下载")
|
||
time.sleep(3)
|
||
self.__checkdirs(
|
||
f"{self.img_path}{os.sep}{url_data[0]}") # 检查文件夹
|
||
for num_try in range(0, self.retrycount): # 如果出错,重试
|
||
try:
|
||
if num_try > 0:
|
||
log.warning(
|
||
"插画 [cyan]{}[/] 第{}次尝试下载".format(filename, num_try))
|
||
# 拼凑文件路径
|
||
filepath = f"{self.img_path}{os.sep}{url_data[0]}{os.sep}{filename}"
|
||
response = requests.get(
|
||
url=url_data[-1], headers=header, proxies=self.proxies)
|
||
|
||
with open(filepath, 'wb') as file:
|
||
file.write(response.content)
|
||
file.close()
|
||
log.info(
|
||
"已成功下载图片[[blue]{}[/]]\n存储目录[[blue]{}[/]]".format(filename, os.path.abspath(filepath)))
|
||
break # !!!!!!!!!!!!!
|
||
except Exception as get_img_error:
|
||
log.error(
|
||
f"[red]下载错误[/]\n错误信息:{get_img_error}")
|
||
time.sleep(randint(2, 4))
|
||
else:
|
||
# log.warning(url_data)
|
||
log.info(
|
||
f"不下载规格为:\"[bold green]{url_data[0]}[/]\" 的插画")
|
||
num += 1
|
||
time.sleep(randint(2, 4))
|
||
metadata = self.get_metadata(illust_id=illust_id)
|
||
# 保存元数据,同时作为图片下载完成的凭据
|
||
self.__save_data(data=metadata, filename=illust_id)
|
||
log.info(
|
||
"插画作品[[blue]{}[/]][green]下载完成!\n[/]共下载[yellow]{}[/]张插画。规格:{}".format(illust_id, num, self.mode))
|
||
|
||
def download(self, *illust_id: int, illust_list: list = []):
|
||
'''
|
||
### 自动下载
|
||
`*illust_id` 插画的id 输入自动转为元组'''
|
||
|
||
illust_id_list = self.del_re(init_id_list(
|
||
list(illust_id))+illust_list) # 这是需要下载的插画id列表,去重处理
|
||
|
||
if self.overwrite: # 如果设置覆盖保存为True:
|
||
log.warning("已开启插图[red]覆盖保存[/]")
|
||
download_img_id_list = illust_id_list
|
||
log.info(f"需要下载插图:{download_img_id_list}")
|
||
else:
|
||
log.info("关闭插图覆盖保存")
|
||
downloaded_img_id_list = self.downloaded_img_id() # 获取已经下载过的图片的列表
|
||
have_downloaded_img_id_list = [] # 输入数据中已经下载了的id
|
||
download_img_id_list = [] # 需要下载的插图id
|
||
|
||
for id in illust_id_list: # 迭代输入的列表
|
||
if id in downloaded_img_id_list: # 如果在已下载的列表里
|
||
have_downloaded_img_id_list.append(id) # 加入已下载的列表
|
||
else:
|
||
download_img_id_list.append(id) # 加入需要下载的列表
|
||
log.info(
|
||
f"需要下载插图:{download_img_id_list}\n插图{have_downloaded_img_id_list}已下载")
|
||
|
||
success_list = []
|
||
fail_list = []
|
||
|
||
for id in download_img_id_list:
|
||
img_urls = self.get_img_url(id) # 获取下载的url
|
||
if img_urls == False:
|
||
fail_list.append(id)
|
||
log.error(f"无法下载插画[{id}]")
|
||
else:
|
||
self.__get_img(illust_id=id, data=self.get_img_url(id))
|
||
success_list.append(id)
|
||
if self.overwrite:
|
||
log.info(
|
||
f"[bold yellow]下载完成![/]\n\n[green]成功下载[/]:{success_list}\n[red]下载失败[/]:{fail_list}")
|
||
else:
|
||
log.info(
|
||
f"[bold yellow]下载完成![/]\n\n[green]成功下载[/]:{success_list}\n[yellow]无需下载[/]:{have_downloaded_img_id_list}\n[red]下载失败[/]:{fail_list}")
|
||
|
||
def downloaded_img_id(self, datapath=None):
|
||
'''
|
||
### 获取data文件夹中已经下载的图片的id
|
||
'''
|
||
if datapath == None:
|
||
datapath = self.data_path
|
||
downloaded_img_id = []
|
||
for root, dirs, files in os.walk(datapath):
|
||
for file in files:
|
||
downloaded_img_id.append(int(file.removesuffix(".json")))
|
||
return downloaded_img_id
|
||
|
||
def del_re(self, old_list) -> list:
|
||
'''
|
||
### 列表去重'''
|
||
new_list = []
|
||
for i in old_list:
|
||
if i in new_list:
|
||
pass
|
||
else:
|
||
new_list.append(int(i))
|
||
return new_list
|
||
|
||
def __save_data(self, filename, data, path='data'):
|
||
'''
|
||
### 保存json数据为json文件\n
|
||
方便以后制作api什么的\n
|
||
#### 参数
|
||
`data` 数据内容\n
|
||
`filename` 文件名称\n
|
||
`path` 路径。默认值:data
|
||
'''
|
||
filename = str(filename)
|
||
self.__checkdirs(path=path) # 检查文件夹是否存在,避免报错
|
||
filepath = path+os.sep+filename+'.json'
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, indent=4, ensure_ascii=False)
|
||
f.close()
|
||
log.debug("data文件[[blue]{}[/]][green]保存成功![/]".format(filepath))
|
||
|
||
def __checkdirs(self, path):
|
||
'''
|
||
### 检查文件夹
|
||
如果文件夹存在则跳过,不存在则创建。防止出现文件夹报错
|
||
'''
|
||
if os.path.exists(path) != True:
|
||
os.makedirs(path)
|
||
log.warning("文件夹[{}]不存在,已自动创建".format(path))
|
||
else:
|
||
log.debug("文件夹[{}]已存在".format(path))
|
||
|
||
def save_metadata(self, illust_id):
|
||
metadata = self.get_metadata(illust_id)
|
||
self.__save_data(filename=illust_id, data=metadata)
|
||
|
||
|
||
def init_id_list(id_list: list):
|
||
'''
|
||
把传入的列表转为int,并剔除不符合标准的id'''
|
||
new_id_list = []
|
||
for id in id_list:
|
||
if str(id).isdigit(): # 如果为数字则符合要求
|
||
new_id_list.append(int(id))
|
||
return new_id_list
|
||
|
||
|
||
if __name__ == "__main__":
|
||
argv_illust_list = init_id_list(sys.argv[1:])
|
||
proxies = {'http': 'http://127.0.0.1:7890',
|
||
'https': 'http://127.0.0.1:7890'}
|
||
mycookie = 'first_visit_datetime_pc=2025-03-08%2021%3A58%3A12; p_ab_id=5; p_ab_id_2=5; p_ab_d_id=1483596866; yuid_b=l3NXQlA; __utmc=235335808; _gid=GA1.2.1327134407.1741438705; __cf_bm=XgiXftZioXifDIHrl58j2CwSs_0lXgwU0NZcYn_DfOE-1741447603-1.0.1.1-JnOjgcFWX3oiWO6RJy.aKIWdkIcwMBWHQlCqPRf_r5AVY3IdWm0nc7g09ylEyu4sXuFFAoNTo9FCjl83eXshUT4CmqEEOBLQeD1TpkCBB2_eFOxJorUD4mQXodqsoW5o; cc1=2025-03-09%2000%3A26%3A46; cf_clearance=gmvBLozuCUikUDEzkdJBoVdoIMdl.TqBf1M3yfYlXkw-1741447619-1.2.1.1-pagKya7g.ZaCemcT4nTmmJ7R.g3iLCZ.uQKet1aga4eVa.NJGtykla4ebZeJxPzegCuuWJbQSdm2C6OLuM_kVNw2kTz8ebBuQhsbXmoh.RKZxPqT1xXoW.h1AaDEyuw8di7qSqki1OY63HbXZi5n6QM8WNilbYD0XLK7XgBskxjdW9CUa7K_odYKzyKnQzsDPxvqdQq8tunOdLCFIVspRhH68fwJyPRZkENtQRgBQasZUAK3vV1cfPDGah0p_P20WjdVseGUsxwkE0fEdtDxWMWiZdY7X8Ap_lQxFVlgZO1dT6mMuPC.lGWyF0GodCi9HBJK33bfxp204GxQkre5.WBR1q5NkEK07MitoBg3XpGyD_1j5u9ypO4F27K7zLv0vk5_3M9oNG3XPjqt.OF7RZRq9X8rLuYJ6pocfT5v2zg; PHPSESSID=78817947_BZAhwz9icmcJwYSt2Q8GqEqpppOWEI9f; device_token=102d86804f1d11b276deaafe40046de4; privacy_policy_agreement=7; _ga_MZ1NL4PHH0=GS1.1.1741447612.2.1.1741447768.0.0.0; c_type=24; privacy_policy_notification=0; a_type=0; b_type=1; __utma=235335808.1208433761.1741438703.1741438703.1741447776.2; __utmz=235335808.1741447776.2.2.utmcsr=accounts.pixiv.net|utmccn=(referral)|utmcmd=referral|utmcct=/login; __utmv=235335808.|2=login%20ever=no=1^3=plan=normal=1^5=gender=male=1^6=user_id=78817947=1^9=p_ab_id=5=1^10=p_ab_id_2=5=1^11=lang=zh=1; __utmt=1; __utmb=235335808.1.10.1741447776; _ga_75BBYNYN9J=GS1.1.1741447784.2.0.1741447799.0.0.0; _ga=GA1.2.1541592919.1741438703; _gat_UA-1830249-3=1'
|
||
pixiv = Pixiv(cookie=mycookie, proxies=proxies, overwrite=False,
|
||
mode="full", retrycount=100, img_path='img')
|
||
pixiv.download(illust_list=argv_illust_list)
|