1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
| import json import httpx import os import asyncio import pickle from bs4 import BeautifulSoup import re
def load_cookies(): ''' cookie读取,域敏感因此之间转dict存不行 ''' if not os.path.isfile("cookies.pk"): return None cookies = httpx.Cookies() with open("cookies.pk", "rb") as f: jar_cookies = pickle.load(f) for domain, pc in jar_cookies.items(): for path, c in pc.items(): for k, v in c.items(): cookies.set(k, v.value, domain=domain, path=path) return cookies
client = httpx.AsyncClient(cookies=load_cookies())
def get_header(image: bool = False) -> dict: ''' 获取request header部分 image: 请求的是否为图片 ''' headers = {'referer': 'https://www.bcy.net/', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } if image == True: headers['accept'] = 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8' return headers
async def id2name(id: int) -> str: ''' 将id转name ''' url = "https://www.bcy.net/tag/{id}".format(id=id) r = await client.get(url, headers=get_header()) soup = BeautifulSoup(r.text, features="html.parser") name = soup.find('meta', {'name': 'keywords'})['content'].split(" - ")[0] return name
async def load_name2id_cache(): ''' 获取name2id的缓存 ''' if not os.path.isfile('cache.json'): return None with open('cache.json','r',encoding='utf-8') as f: return json.load(f)
async def save_name2id_cache(cache): ''' 保存缓存 ''' with open('cache.json','w',encoding='utf-8') as f: json.dump(cache,f,ensure_ascii=False)
async def name2id(name: str): ''' 将name转id ''' cache = await load_name2id_cache() if not cache == None and name in cache: return cache[name] if cache == None: cache = {} url = "https://www.bcy.net/search/home?k={name}".format(name=name) r = await client.get(url, headers=get_header()) raw = re.findall('JSON.parse\(".*"\);', r.text)[0].replace('JSON.parse(', '').replace(');', '').replace('\\\\u002F', '/') data = json.loads(raw) data = json.loads(data) circles = data['all']['circle']['circle_list'] cid = None for circle in circles: cache[circle['circle_name']] = circle['circle_id'] if circle['circle_name'] == name: cid = circle['circle_id'] await save_name2id_cache(cache) return cid
async def download_image(url: str, save_path: str = '', format: str = 'jpeg'): ''' 通过url下载图片 ''' filename = url.split('/')[-1]+'.'+format filename = os.path.join(save_path, filename) url = url + "~noop.image" r = await client.get(url, headers=get_header()) with open(filename, 'wb') as f: f.write(r.content)
async def get_more_from_tag(circle_id: int, since: int = 0): ''' 调用circleFeed接口,获取圈子内的图 ''' headers = get_header() url = "https://www.bcy.net/apiv3/common/circleFeed?circle_id={cid}&since=rec:{since}&sort_type=1&grid_type=10".format( cid=circle_id, since=since) r = await client.get(url, headers=headers) return r.json()
async def download_image_from_tag(circle_id: int, circle_name: str = None): ''' 通过圈子id下载图片 ''' if circle_name == None or circle_name.strip() == '': circle_name = str(circle_id) save_path = circle_name if not os.path.exists(save_path): os.mkdir(save_path) content = await get_more_from_tag(circle_id, since=0) items = content['data']['items'] for item in items: image_list = item['item_detail']['image_list'] for image in image_list: await download_image(image['path'], save_path=save_path, format=image['format'])
async def download_image_from_name(circle_name:str): ''' 通过圈名下载图片 ''' cid = await name2id(circle_name) if cid != None: await download_image_from_tag(cid,circle_name)
async def close_client(): ''' 关闭client,保存cookie ''' with open("cookies.pk", "wb") as f: pickle.dump(client.cookies.jar._cookies, f) await client.aclose()
loop = asyncio.get_event_loop() loop.run_until_complete(download_image_from_name("COS正片"))
loop.run_until_complete(close_client()) loop.close()
|