1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/shaozhupeng-SpiderRobot

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
CrawlTest.py 21 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Szper Отправлено 13.08.2019 19:03 6b28228
import requests
import re
import json
import os
from urllib.parse import urlencode
from pyquery import PyQuery as pq
from hashlib import md5
from multiprocessing.pool import Pool
def CrawlZhihu():
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
r = requests.get('http://www.zhihu.com/explore', headers=headers)
pattern = re.compile('explore-feed.*?question_link.*?>(.*?)</a>', re.S)
titles = re.findall(pattern, r.text)
print (titles)
def CrawlGithub():
r = requests.get("http://www.github.com/favicon.ico")
with open('favicon.ico', 'wb') as f:
f.write(r.content)
def CrawlZhihuNoHeaders():
r = requests.get('http://www.zhihu.com/explore')
#pattern = re.compile('explore-feed.*?question_link.*?>(.*?)</a>', re.S)
#titles = re.findall(pattern, r.text)
print (r.text)
def ZhihuPost():
data = {'name':'Shao', 'age':'22'}
r = requests.post("http://httpbin.org/post", data=data)
print (r.text)
def getZhihuCookie():
r = requests.get('http://www.zhihu.com')
print(r.cookies)
for key, value in r.cookies.items():
print (key+'='+value)
def ZhihuCrawl():
headers = {
'Cookie':'q_c1=72d454554731463ba72276953a44e7c6|1508403204000|1508403204000; _zap=834a1bd2-0d3c-4e6c-91de-cec730760801; __DAYU_PP=YIIFyqIYebqqQ7Fn3uUzffffffff8756221faaf9; tgw_l7_route=200d77f3369d188920b797ddf09ec8d1; _xsrf=f73dbd39-62f3-4208-a178-b2236540c46e; d_c0="AGDmsZrX0g2PTkeTyjnLtHmcUnUL9E6ihcE=|1530267680"; q_c1=72d454554731463ba72276953a44e7c6|1530267680000|1508403204000; capsion_ticket="2|1:0|10:1530267717|14:capsion_ticket|44:ZWQxZDRiZDI3MTFjNGMwY2I3MzMyZTEzMjlmM2NhZjA=|e62f1a8b0bf378a116b4b5514ff15ce13c77c7533d7c8b1288bbf873cdc8222a"; z_c0="2|1:0|10:1530267779|4:z_c0|92:Mi4xX0dvcUJnQUFBQUFBWU9heG10ZlNEU1lBQUFCZ0FsVk5nMVlqWEFCN291N05Pcng5aE1TeDgzV3lwZmpoemRhazln|cc420971f283ccb6de1130a75c7cd4902e90b76d7434f781606e0e7d6dd05445"; unlock_ticket="AHDC21x2gQwmAAAAYAJVTYsPNltD0w_2wOCC9Ugf1k3HZcJwbWnxMw=="',
'Host':'www.zhihu.com',
'User-Agent': 'Mozilla/5.0 (Machintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/53.0.2785.116 Safari/537.36',
}
r = requests.get('http://www.zhihu.com', headers=headers)
print (r.text)
def ProxiesOfSocks():
proxies = {
'http': 'socks5://user:password@host:post',
'https': 'socks5://user:password@host:post'
}
r = requests.get("https://www.taobao.com", proxies=proxies)
def ReTest():
content = "Hello 123 4567 World_This is a Regex Demo"
print(len(content))
result = re.match("^Hello\s\d\d\d\s\d{4}\s\w{10}", content)
print (result)
print (result.group())
print (result.span())
def TongyongPipei():
content = "Hello 123 4567 World_This is a Regex Demo"
result = re.match('^Hello.*Demo$', content)
print(result)
print (result.group())
class GetMaoyan:
def __init__(self):
pass
def get_one_page(self):
url = "http://maoyan.com/board/4"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
return None
def parse_html(self):
#Test = self.get_one_page()
print ("===================")
pattern = re.compile(
'<dd>.*?board-index.*?>(.*?)</i>.*?src="(.*?)".*?name.*?a.*?>(.*?)</a>.*?star.*?>(.*?)</p>.*?releasetime.*?>(.*?)</p>.*?integer.*?></i>.*?fraction.*?>(.*?)</i>.*?</dd>', re.S)
print (self.get_one_page())
html = self.get_one_page()
items = re.findall(pattern,html)
print (items)
# for item in items:
# yield {
# 'index': item[0],
# 'image': item[1],
# 'title': item[2].strip(),
# 'actor': item[3].strip()[3:] if len(item[3]) > 3 else '',
# 'time': item[4].strip()[5:] if len(item[4]) > 5 else '',
# 'score': item[5].strip() + item[6].strip()
# }
#print (item)
'''def write_to_file(self, content):
with open('result.txt', 'a', encoding='utf-8') as f:
print(type(json.dumps(content)))
f.write(json.dumps(content, ensure_ascii=False)+'\n')
def Remain(self):
html = self.get_one_page()
for item in self.parse_html(html):
print (item)
self.write_to_file(item)
print (html)'''
def getZhihuContentAsFile():
url = 'https://www.zhihu.com/explore'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
html = requests.get(url, headers=headers).text
doc = pq(html)
items = doc('.explore-tab .feed-item').items()
for item in items:
question = item.find('h2').text()
author = item.find('.author-link-line').text()
answer = pq(item.find('.content').html()).text()
file = open('explore.txt', 'a', encoding='utf-8')
file.write('\n'.join([question, author,answer]))
file.write('\n'+'='*50 + '\n')
file.close()
class crawlWeibo:
def __init__(self):
self.headers={
'Host': 'm.weibo.cn',
'Referer': 'https://m.weibo.cn/u/2830678474',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36',
'X-Request-Width': 'XMLHttpRequest'
}
self.base_url = 'https://www.weibo.cn/api/container/getIndex?'
def get_page(self, page):
params = {
'type': 'uid',
'value': '2830678474',
'containerid': '1076032830678474',
'page': page
}
url = self.base_url + urlencode(params)
try:
response = requests.get(url, headers = self.headers)
if(response.status_code == 200):
return response.json(), page
except requests.ConnectionError as e:
print('Error', e.args)
def parse_page(self, json, page: int):
if json:
items = json.get('data').get('cards')
for index, item in enumerate(items):
if page == 1 or index == 1:
continue
else:
item = item.get('mblog')
weibo = {}
weibo['id'] = item.get('id')
weibo['text'] = pq(item.get('text')).text()
weibo['attitudes'] = item.get('attitudes_count')
weibo['comments'] = item.get('comments_count')
weibo['reposts'] = item.get('reposts_count')
yield weibo
def write_to_txt(self):
for page in range(1, 11):
json = self.get_page(page)
results = self.parse_page(*json)
for result in results:
print (type(result))
file = open('weibo.txt', 'a', encoding='utf-8')
file.write(str(result))
file.write('\n' + '='*50 + '\n')
file.close()
print("Write Success")
class crawlToutiao:
def __init__(self):
self.base_url = 'http://www.toutiao.com/search_content/?'
self.START = 1
self.END = 20
def get_page(self, offset):
params = {
'offset': offset,
'format': 'json',
'keyword': '妹子',
'autoload': 'true',
'count': '20',
'cur_tab': '1',
'from': 'search_tab'
}
url = self.base_url + urlencode(params)
try:
response = requests.get(url)
if(response.status_code == 200):
return response.json()
except requests.ConnectionError:
return None
def get_images(self, json):
if json.get('data'):
for item in json.get('data'):
if item:
title = item.get('title')
images = item.get('image_list')
if images:
for image in images:
yield {
'image': image.get('url'),
'title': title
}
else:
continue
def save_images(self, item):
if not os.path.exists(item.get('title')):
os.mkdir(item.get('title'))
try:
local_image_url = item.get('image')
new_image_url = local_image_url.replace('list', 'large')
response = requests.get('http:'+new_image_url)
if response.status_code == 200:
file_path = '{0}/{1}.{2}'.format(item.get('title'), md5(response.content).hexdigest(), '.jpg')
if not os.path.exists(file_path):
with open(file_path, 'wb') as f:
f.write(response.content)
else:
print('Already downloaded', file_path)
except requests.ConnectionError:
print ('Faild to download')
def buildOffset(self):
return ([x * 20 for x in range(self.START, self.END + 1)])
def CrawlMain(self, offset):
json = self.get_page(offset)
for item in self.get_images(json):
print (item)
self.save_images(item)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
class seleniumLearn:
def __init__(self):
self.brower = webdriver.Firefox()
self.url = 'http://www.baidu.com'
def sele(self):
try:
self.brower.get(self.url)
input = self.brower.find_element_by_id('kw')
input.send_keys('Python')
input.send_keys(Keys.ENTER)
wait = WebDriverWait(self.brower, 10)
wait.until(EC.presence_of_element_located((By.ID, 'content_left')))
print(self.brower.current_url)
print (self.brower.get_cookies())
#8print (self.brower.page_source)
finally:
self.brower.close()
from selenium import webdriver
from selenium.webdriver import ActionChains
import time
class dragTest:
def __init__(self):
self.brower = webdriver.Firefox()
self.url = 'http://www.runoob.com/try/try.php?filename=jqueryui-api-droppable'
def dragToTarget(self):
self.brower.get(self.url)
self.brower.switch_to.frame('iframeResult')
source = self.brower.find_element_by_css_selector('#draggable')
target = self.brower.find_element_by_css_selector('#droppable')
actions = ActionChains(self.brower)
actions.drag_and_drop(source, target)
actions.perform()
def ManageWindow(self):
self.brower.get('http://www.baidu.com')
self.brower.execute_script('window.open()')
print(self.brower.window_handles)
self.brower.switch_to_window(self.brower.window_handles[1])
self.brower.get('http://www.taobao.com')
time.sleep(1)
self.brower.switch_to_window(self.brower.window_handles[0])
self.brower.get('https://python.org')
import requests
from urllib.parse import quote
import re
class splash_Test:
def __init__(self):
self.lua = '''
function main(splash, args)
local treat = require("treat")
local response = splash:http_get("http://httpbin.org/get")
return treat.as_string(response.body)
end
'''
self.url = 'http://10.251.230.23:8060/execute?lua_source=' + quote(self.lua)
def main(self):
response = requests.get(self.url, auth=('admin', '961213'))
ip = re.search('(\d+\.\d+\.\d+\.\d)', response.text).group(1)
print (ip)
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from urllib.parse import quote
from pyquery import PyQuery as pq
import json
class CrawlTaobao:
def __init__(self):
self.brower = webdriver.Firefox()
self.wait = WebDriverWait(self.brower, 10)
self.KEYWORD = 'iPad'
self.MAX_PAGE = 5
def index_page(self, page):
print ("正在抓取第 ", page, "页")
try:
url = 'http://s.taobao.com/search?q=' + quote(self.KEYWORD)
self.brower.get(url)
if(page > 1):
input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '#mainsrp-pager div.form > input')))
submit = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#mainsrp-pager div.form > span.btn.J_Submit')))
input.clear()
input.send_keys(page)
submit.click()
self.wait.until(EC.text_to_be_present_in_element((By.CSS_SELECTOR, '#mainsrp-pager li.item.active > span'), str(page)))
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.m-itemlist .items .item')))
self.get_products()
except TimeoutException:
index_page(page)
def get_products(self):
html = self.brower.page_source
doc = pq(html)
items = doc('#mainsrp-itemlist .items .item').items()
for item in items:
product = {
'image': item.find('.pic .img').attr('data-src'),
'price': item.find('.price').text(),
'deal': item.find('.deal-cnt').text(),
'title': item.find('.title').text(),
'shop': item.find('.shop').text(),
'location': item.find('.location').text()
}
print (product)
self.save_to_file(product)
def save_to_file(self, product):
try:
with open('taobao.txt', 'a', encoding='utf-8') as f:
#json.dump(product, f)
pro = json.dumps(product)
f.write(pro)
f.write('\n' + '='*50 + '\n')
f.close()
print("存储成功")
except Exception:
print ("存储失败")
def main(self):
for i in range(1, self.MAX_PAGE + 1):
self.index_page(i)
import pytesseract
from PIL import Image
class tesserocrLearn:
def __init__(self):
pass
def learn(self):
image = Image.open('code.jpg')
result = tesserocr.image_to_text(image)
print (result)
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
PHONE = ''#账号
PASS = '' #密码
class SlideConfig:
def __init__(self):
self.url = 'https://login.flyme.cn'
self.brower = webdriver.Firefox()
self.wait = WebDriverWait(self.brower, 20)
self.phone = PHONE
self.password = PASS
def get_geetest_button(self):
self.brower.get(self.url)
button = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, 'geetest_radar_tip')))
return button
if __name__ == '__main__':
'''Tou = crawlToutiao()
groups = Tou.buildOffset()
pool = Pool()
pool.map(Tou.CrawlMain, groups)
pool.close()
pool.join()'''
#=========================
'''Se = seleniumLearn()
Se.sele()'''
#=========================
'''Drag = dragTest()
Drag.ManageWindow()'''
'''Splash = splash_Test()
Splash.main()'''
#=========================
'''Tao = CrawlTaobao()
Tao.main()'''
#=========================
'''Tes = tesserocrLearn()
Tes.learn()'''
#=========================
image = Image.open('photo.jpg')
print (image)
result = pytesseract.image_to_string(image,lang='chi_sim')
print ("++++",result)
'''slide = SlideConfig()
button = slide.get_geetest_button()
button.click()'''

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/shaozhupeng-SpiderRobot.git
git@api.gitlife.ru:oschina-mirror/shaozhupeng-SpiderRobot.git
oschina-mirror
shaozhupeng-SpiderRobot
shaozhupeng-SpiderRobot
master