import os import sys import requests import time from random import randint import json import logging from rich.logging import RichHandler # from rich.traceback import install # install(show_locals=True) from rich.progress import ( SpinnerColumn, BarColumn, DownloadColumn, Progress, TaskID, TextColumn ) FORMAT = "%(message)s" logging.basicConfig( level=logging.INFO, format=FORMAT, datefmt=None, handlers=[RichHandler(show_time=False, keywords=[''], markup=True)] ) log = logging.getLogger("rich") class Pixiv(): def __init__(self, cookie: str = None, header: dict = None, proxies: dict = None, data_path: str = 'data', img_path: str = 'img', overwrite: bool = False, mode: list or str = "origin", retrycount: int = 10) -> None: ''' ### 初始化 初始化cookie,header,代理,数据文件路径(data_path),图片保存路径(img_path)\n `overwrite` True 当数据文件在datapath存在时,覆盖保存;False 不覆盖保存\n `mode` 下载模式 传入`"full"`则下载全部,传入`"规格"`即下载对应规格。也可传入列表自定义选择要下载的项 ["original","regular","small","thumb_mini"]。默认为"original"仅下载原图\n `retrycount` 出错后的自动重试次数 ''' # 初始化各类类变量 if header == None: header = { "content-type": "application/json", "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", "accept-language": "zh-CN,zh;q=0.9", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "content-type": "charset=UTF-8", } if cookie != None: header.update({'cookie': cookie}) self.cookie = cookie if proxies == None: proxies = {} if type(mode) == str: mode = mode.lower() elif type(mode) == list: mode_lower = [] for i in mode: mode_lower.append(i.lower()) mode = mode_lower if type(mode) == str and mode == 'full': self.mode = ["original", "regular", "small", "thumb_mini"] elif type(mode) == str and mode != 'full': self.mode = [mode] elif type(mode) == list: self.mode = mode self.header = header # 设置头信息(包含cookie,如果有的话) self.proxies = proxies # 设置代理信息 self.data_path = data_path # 保存data信息的路径 self.img_path = img_path # 保存图片路径 self.overwrite = overwrite # 覆盖保存设置 self.retrycount = retrycount def get_img_url(self, illust_id: int) -> bool or dict: ''' ### 根据Pixiv的id获取图片链接 正常查询返回dict数据,若出现错误则返回False ''' illust_id = str(illust_id) log.info(f"搜索插画[{illust_id}]") origin_url = "https://www.pixiv.net/ajax/illust/{}/pages" for i in range(0, self.retrycount): # 尝试 status = None try: if i > 0: log.warning(f"第{i}次尝试搜索插画[{illust_id}]") response = requests.get(url=origin_url.format( illust_id), headers=self.header, proxies=self.proxies) # 发送get请求 response = response.json() # 将返回的json数据格式化 log.info(f"成功搜索到插画[{illust_id}]") break # 成功后退出循环 except Exception as search_error: log.error(f"获取插画[{illust_id}]链接失败\n错误信息:{search_error}") status = False if i >= (self.retrycount-1) and status == False: return False if response['error']: log.error(f"获取插画[{illust_id}]链接出错。{response['message']}") return False # 出错返回False else: # log.debug(response) # self.__save_data(data=response,filename=illust_id) return response['body'] # 返回主体链接部分 def get_metadata(self, illust_id: int) -> dict: ''' ### 获取插画元数据 ''' log.info(f"获取插画[{illust_id}]元数据") header = { "content-type": "application/json", "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", "accept-language": "zh-CN,zh;q=0.9", "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "content-encoding": "gzip", "content-type": "charset=UTF-8"} url = "https://www.pixiv.net/ajax/illust/{}" for i in range(0, self.retrycount): try: if i > 0: log.warning(f"第{i}尝试获取插画[{illust_id}]元数据") response = requests.get(url=url.format( illust_id), headers=header, proxies=self.proxies) return response.json() except Exception as get_metadata_error: log.error( "[red]获取插画[[cyan]{}][/]元数据出错[/]\n错误信息:{}".format(illust_id, get_metadata_error)) def __get_img(self, illust_id: int, data: dict): header = { "accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8", "referer": "https://www.pixiv.net/artworks/{}".format(str(illust_id)), "Accept-Encoding": "identity", "sec-fetch-dest": "image", "sec-fetch-mode": "no-cors", "sec-fetch-site": "cross-site", "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36" } progress = Progress( SpinnerColumn(), TextColumn("{task.fields[status]}", justify="right"), "|", TextColumn( "[bold blue]{task.fields[filename]}", justify="right"), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "•", DownloadColumn() ) with progress: for urls_data in data: # json中的body中含有一个列表,表示同时发布在一张插画页上的不同插画 # 列表包含几个字典,其中键值urls时包含了同一张图片不同规格的源地址的字典,键值为图片的规格,取出这个字典 urls = urls_data['urls'] num = 0 # 计数 for url_data in urls.items(): # 遍历该字典,一般含有thumb_mini,small,regular,original四种规格。items()函数将返回一个包含键和对应值的列表 if (url_data[0]).lower() in self.mode: # 如果该项在需要下载的规格的列表里,下载 filename = os.path.basename(url_data[1]) # 获取文件名称 for i in range(0, self.retrycount): try: log.debug(f"获取[{filename}]头信息") response_header = requests.head( url=url_data[-1], headers=header, proxies=self.proxies, timeout=5) back_header = (response_header).headers log.debug(f"成功获取[{filename}]头信息") break except Exception as get_header_error: log.error(f"获取文件大小出错\n错误信息:{get_header_error}") if 'Content-Length' in back_header: # 如果服务器返回文件大小 # 使用head()获取文件大小 length = back_header['Content-Length'] # 为防出错,搞个循环,错了就再来亿次 self.__checkdirs( f"{self.img_path}{os.sep}{url_data[0]}") # 检查文件夹 response = requests.get( url=url_data[-1], headers=header, proxies=self.proxies, stream=True, timeout=20) for num_try in range(0, self.retrycount): # 如果出错,重试 task_2 = progress.add_task( "下载", status="[bold yellow]下载中...[/]", filename=filename, start=False) progress.update( task_id=task_2, total=int(length)) try: # 拼凑文件路径 filepath = f"{self.img_path}{os.sep}{url_data[0]}{os.sep}{filename}" file = open(filepath, "wb") # log.error(f"文件大小:{length}") size = 512 # 文件碎片大小 for chunk in response.iter_content(chunk_size=size): progress.update(task_2, advance=size) if chunk: file.write(chunk) file.close() progress.update( task_id=task_2, visible=False) # 下载完成后隐藏进度条 progress.update( task_id=task_2, status="[bold green]下载成功![/]", refresh=False) log.info( "已成功下载图片[[blue]{}[/]]\n存储目录[[blue]{}[/]]".format(filename, os.path.abspath(filepath))) break except Exception as get_img_error_2: progress.remove_task(task_2) log.error( "[red]下载错误[/]\n错误原因:{}\n第{}次尝试下载".format(get_img_error_2, num_try)) time.sleep(randint(2, 8)) else: # 如果没有返回length # 传统下载方法 log.warning("无法获取文件大小,使用传统方法下载") time.sleep(3) self.__checkdirs( f"{self.img_path}{os.sep}{url_data[0]}") # 检查文件夹 for num_try in range(0, self.retrycount): # 如果出错,重试 try: if num_try > 0: log.warning( "插画 [cyan]{}[/] 第{}次尝试下载".format(filename, num_try)) # 拼凑文件路径 filepath = f"{self.img_path}{os.sep}{url_data[0]}{os.sep}{filename}" response = requests.get( url=url_data[-1], headers=header, proxies=self.proxies) with open(filepath, 'wb') as file: file.write(response.content) file.close() log.info( "已成功下载图片[[blue]{}[/]]\n存储目录[[blue]{}[/]]".format(filename, os.path.abspath(filepath))) break # !!!!!!!!!!!!! except Exception as get_img_error: log.error( f"[red]下载错误[/]\n错误信息:{get_img_error}") time.sleep(randint(2, 4)) else: # log.warning(url_data) log.info( f"不下载规格为:\"[bold green]{url_data[0]}[/]\" 的插画") num += 1 time.sleep(randint(2, 4)) metadata = self.get_metadata(illust_id=illust_id) # 保存元数据,同时作为图片下载完成的凭据 self.__save_data(data=metadata, filename=illust_id) log.info( "插画作品[[blue]{}[/]][green]下载完成!\n[/]共下载[yellow]{}[/]张插画。规格:{}".format(illust_id, num, self.mode)) def download(self, *illust_id: int, illust_list: list = []): ''' ### 自动下载 `*illust_id` 插画的id 输入自动转为元组''' illust_id_list = self.del_re(init_id_list( list(illust_id))+illust_list) # 这是需要下载的插画id列表,去重处理 if self.overwrite: # 如果设置覆盖保存为True: log.warning("已开启插图[red]覆盖保存[/]") download_img_id_list = illust_id_list log.info(f"需要下载插图:{download_img_id_list}") else: log.info("关闭插图覆盖保存") downloaded_img_id_list = self.downloaded_img_id() # 获取已经下载过的图片的列表 have_downloaded_img_id_list = [] # 输入数据中已经下载了的id download_img_id_list = [] # 需要下载的插图id for id in illust_id_list: # 迭代输入的列表 if id in downloaded_img_id_list: # 如果在已下载的列表里 have_downloaded_img_id_list.append(id) # 加入已下载的列表 else: download_img_id_list.append(id) # 加入需要下载的列表 log.info( f"需要下载插图:{download_img_id_list}\n插图{have_downloaded_img_id_list}已下载") success_list = [] fail_list = [] for id in download_img_id_list: img_urls = self.get_img_url(id) # 获取下载的url if img_urls == False: fail_list.append(id) log.error(f"无法下载插画[{id}]") else: self.__get_img(illust_id=id, data=self.get_img_url(id)) success_list.append(id) if self.overwrite: log.info( f"[bold yellow]下载完成![/]\n\n[green]成功下载[/]:{success_list}\n[red]下载失败[/]:{fail_list}") else: log.info( f"[bold yellow]下载完成![/]\n\n[green]成功下载[/]:{success_list}\n[yellow]无需下载[/]:{have_downloaded_img_id_list}\n[red]下载失败[/]:{fail_list}") def downloaded_img_id(self, datapath=None): ''' ### 获取data文件夹中已经下载的图片的id ''' if datapath == None: datapath = self.data_path downloaded_img_id = [] for root, dirs, files in os.walk(datapath): for file in files: downloaded_img_id.append(int(file.removesuffix(".json"))) return downloaded_img_id def del_re(self, old_list) -> list: ''' ### 列表去重''' new_list = [] for i in old_list: if i in new_list: pass else: new_list.append(int(i)) return new_list def __save_data(self, filename, data, path='data'): ''' ### 保存json数据为json文件\n 方便以后制作api什么的\n #### 参数 `data` 数据内容\n `filename` 文件名称\n `path` 路径。默认值:data ''' filename = str(filename) self.__checkdirs(path=path) # 检查文件夹是否存在,避免报错 filepath = path+os.sep+filename+'.json' with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) f.close() log.debug("data文件[[blue]{}[/]][green]保存成功![/]".format(filepath)) def __checkdirs(self, path): ''' ### 检查文件夹 如果文件夹存在则跳过,不存在则创建。防止出现文件夹报错 ''' if os.path.exists(path) != True: os.makedirs(path) log.warning("文件夹[{}]不存在,已自动创建".format(path)) else: log.debug("文件夹[{}]已存在".format(path)) def save_metadata(self, illust_id): metadata = self.get_metadata(illust_id) self.__save_data(filename=illust_id, data=metadata) def init_id_list(id_list: list): ''' 把传入的列表转为int,并剔除不符合标准的id''' new_id_list = [] for id in id_list: if str(id).isdigit(): # 如果为数字则符合要求 new_id_list.append(int(id)) return new_id_list if __name__ == "__main__": argv_illust_list = init_id_list(sys.argv[1:]) proxies = {'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890'} mycookie = 'first_visit_datetime_pc=2025-03-08%2021%3A58%3A12; p_ab_id=5; p_ab_id_2=5; p_ab_d_id=1483596866; yuid_b=l3NXQlA; __utmc=235335808; _gid=GA1.2.1327134407.1741438705; __cf_bm=XgiXftZioXifDIHrl58j2CwSs_0lXgwU0NZcYn_DfOE-1741447603-1.0.1.1-JnOjgcFWX3oiWO6RJy.aKIWdkIcwMBWHQlCqPRf_r5AVY3IdWm0nc7g09ylEyu4sXuFFAoNTo9FCjl83eXshUT4CmqEEOBLQeD1TpkCBB2_eFOxJorUD4mQXodqsoW5o; cc1=2025-03-09%2000%3A26%3A46; cf_clearance=gmvBLozuCUikUDEzkdJBoVdoIMdl.TqBf1M3yfYlXkw-1741447619-1.2.1.1-pagKya7g.ZaCemcT4nTmmJ7R.g3iLCZ.uQKet1aga4eVa.NJGtykla4ebZeJxPzegCuuWJbQSdm2C6OLuM_kVNw2kTz8ebBuQhsbXmoh.RKZxPqT1xXoW.h1AaDEyuw8di7qSqki1OY63HbXZi5n6QM8WNilbYD0XLK7XgBskxjdW9CUa7K_odYKzyKnQzsDPxvqdQq8tunOdLCFIVspRhH68fwJyPRZkENtQRgBQasZUAK3vV1cfPDGah0p_P20WjdVseGUsxwkE0fEdtDxWMWiZdY7X8Ap_lQxFVlgZO1dT6mMuPC.lGWyF0GodCi9HBJK33bfxp204GxQkre5.WBR1q5NkEK07MitoBg3XpGyD_1j5u9ypO4F27K7zLv0vk5_3M9oNG3XPjqt.OF7RZRq9X8rLuYJ6pocfT5v2zg; PHPSESSID=78817947_BZAhwz9icmcJwYSt2Q8GqEqpppOWEI9f; device_token=102d86804f1d11b276deaafe40046de4; privacy_policy_agreement=7; _ga_MZ1NL4PHH0=GS1.1.1741447612.2.1.1741447768.0.0.0; c_type=24; privacy_policy_notification=0; a_type=0; b_type=1; __utma=235335808.1208433761.1741438703.1741438703.1741447776.2; __utmz=235335808.1741447776.2.2.utmcsr=accounts.pixiv.net|utmccn=(referral)|utmcmd=referral|utmcct=/login; __utmv=235335808.|2=login%20ever=no=1^3=plan=normal=1^5=gender=male=1^6=user_id=78817947=1^9=p_ab_id=5=1^10=p_ab_id_2=5=1^11=lang=zh=1; __utmt=1; __utmb=235335808.1.10.1741447776; _ga_75BBYNYN9J=GS1.1.1741447784.2.0.1741447799.0.0.0; _ga=GA1.2.1541592919.1741438703; _gat_UA-1830249-3=1' pixiv = Pixiv(cookie=mycookie, proxies=proxies, overwrite=False, mode="full", retrycount=100, img_path='img') pixiv.download(illust_list=argv_illust_list)