Слияние кода завершено, страница обновится автоматически
import requests
import re
import json
import os
from urllib.parse import urlencode
from pyquery import PyQuery as pq
from hashlib import md5
from multiprocessing.pool import Pool
def CrawlZhihu():
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
r = requests.get('http://www.zhihu.com/explore', headers=headers)
pattern = re.compile('explore-feed.*?question_link.*?>(.*?)</a>', re.S)
titles = re.findall(pattern, r.text)
print (titles)
def CrawlGithub():
r = requests.get("http://www.github.com/favicon.ico")
with open('favicon.ico', 'wb') as f:
f.write(r.content)
def CrawlZhihuNoHeaders():
r = requests.get('http://www.zhihu.com/explore')
#pattern = re.compile('explore-feed.*?question_link.*?>(.*?)</a>', re.S)
#titles = re.findall(pattern, r.text)
print (r.text)
def ZhihuPost():
data = {'name':'Shao', 'age':'22'}
r = requests.post("http://httpbin.org/post", data=data)
print (r.text)
def getZhihuCookie():
r = requests.get('http://www.zhihu.com')
print(r.cookies)
for key, value in r.cookies.items():
print (key+'='+value)
def ZhihuCrawl():
headers = {
'Cookie':'q_c1=72d454554731463ba72276953a44e7c6|1508403204000|1508403204000; _zap=834a1bd2-0d3c-4e6c-91de-cec730760801; __DAYU_PP=YIIFyqIYebqqQ7Fn3uUzffffffff8756221faaf9; tgw_l7_route=200d77f3369d188920b797ddf09ec8d1; _xsrf=f73dbd39-62f3-4208-a178-b2236540c46e; d_c0="AGDmsZrX0g2PTkeTyjnLtHmcUnUL9E6ihcE=|1530267680"; q_c1=72d454554731463ba72276953a44e7c6|1530267680000|1508403204000; capsion_ticket="2|1:0|10:1530267717|14:capsion_ticket|44:ZWQxZDRiZDI3MTFjNGMwY2I3MzMyZTEzMjlmM2NhZjA=|e62f1a8b0bf378a116b4b5514ff15ce13c77c7533d7c8b1288bbf873cdc8222a"; z_c0="2|1:0|10:1530267779|4:z_c0|92:Mi4xX0dvcUJnQUFBQUFBWU9heG10ZlNEU1lBQUFCZ0FsVk5nMVlqWEFCN291N05Pcng5aE1TeDgzV3lwZmpoemRhazln|cc420971f283ccb6de1130a75c7cd4902e90b76d7434f781606e0e7d6dd05445"; unlock_ticket="AHDC21x2gQwmAAAAYAJVTYsPNltD0w_2wOCC9Ugf1k3HZcJwbWnxMw=="',
'Host':'www.zhihu.com',
'User-Agent': 'Mozilla/5.0 (Machintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/53.0.2785.116 Safari/537.36',
}
r = requests.get('http://www.zhihu.com', headers=headers)
print (r.text)
def ProxiesOfSocks():
proxies = {
'http': 'socks5://user:password@host:post',
'https': 'socks5://user:password@host:post'
}
r = requests.get("https://www.taobao.com", proxies=proxies)
def ReTest():
content = "Hello 123 4567 World_This is a Regex Demo"
print(len(content))
result = re.match("^Hello\s\d\d\d\s\d{4}\s\w{10}", content)
print (result)
print (result.group())
print (result.span())
def TongyongPipei():
content = "Hello 123 4567 World_This is a Regex Demo"
result = re.match('^Hello.*Demo$', content)
print(result)
print (result.group())
class GetMaoyan:
def __init__(self):
pass
def get_one_page(self):
url = "http://maoyan.com/board/4"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
return None
def parse_html(self):
#Test = self.get_one_page()
print ("===================")
pattern = re.compile(
'<dd>.*?board-index.*?>(.*?)</i>.*?src="(.*?)".*?name.*?a.*?>(.*?)</a>.*?star.*?>(.*?)</p>.*?releasetime.*?>(.*?)</p>.*?integer.*?></i>.*?fraction.*?>(.*?)</i>.*?</dd>', re.S)
print (self.get_one_page())
html = self.get_one_page()
items = re.findall(pattern,html)
print (items)
# for item in items:
# yield {
# 'index': item[0],
# 'image': item[1],
# 'title': item[2].strip(),
# 'actor': item[3].strip()[3:] if len(item[3]) > 3 else '',
# 'time': item[4].strip()[5:] if len(item[4]) > 5 else '',
# 'score': item[5].strip() + item[6].strip()
# }
#print (item)
'''def write_to_file(self, content):
with open('result.txt', 'a', encoding='utf-8') as f:
print(type(json.dumps(content)))
f.write(json.dumps(content, ensure_ascii=False)+'\n')
def Remain(self):
html = self.get_one_page()
for item in self.parse_html(html):
print (item)
self.write_to_file(item)
print (html)'''
def getZhihuContentAsFile():
url = 'https://www.zhihu.com/explore'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36'
}
html = requests.get(url, headers=headers).text
doc = pq(html)
items = doc('.explore-tab .feed-item').items()
for item in items:
question = item.find('h2').text()
author = item.find('.author-link-line').text()
answer = pq(item.find('.content').html()).text()
file = open('explore.txt', 'a', encoding='utf-8')
file.write('\n'.join([question, author,answer]))
file.write('\n'+'='*50 + '\n')
file.close()
class crawlWeibo:
def __init__(self):
self.headers={
'Host': 'm.weibo.cn',
'Referer': 'https://m.weibo.cn/u/2830678474',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebkit/537.36 (KHTML, like Gecko) Chorme/52.0.2743.116 Safari/537.36',
'X-Request-Width': 'XMLHttpRequest'
}
self.base_url = 'https://www.weibo.cn/api/container/getIndex?'
def get_page(self, page):
params = {
'type': 'uid',
'value': '2830678474',
'containerid': '1076032830678474',
'page': page
}
url = self.base_url + urlencode(params)
try:
response = requests.get(url, headers = self.headers)
if(response.status_code == 200):
return response.json(), page
except requests.ConnectionError as e:
print('Error', e.args)
def parse_page(self, json, page: int):
if json:
items = json.get('data').get('cards')
for index, item in enumerate(items):
if page == 1 or index == 1:
continue
else:
item = item.get('mblog')
weibo = {}
weibo['id'] = item.get('id')
weibo['text'] = pq(item.get('text')).text()
weibo['attitudes'] = item.get('attitudes_count')
weibo['comments'] = item.get('comments_count')
weibo['reposts'] = item.get('reposts_count')
yield weibo
def write_to_txt(self):
for page in range(1, 11):
json = self.get_page(page)
results = self.parse_page(*json)
for result in results:
print (type(result))
file = open('weibo.txt', 'a', encoding='utf-8')
file.write(str(result))
file.write('\n' + '='*50 + '\n')
file.close()
print("Write Success")
class crawlToutiao:
def __init__(self):
self.base_url = 'http://www.toutiao.com/search_content/?'
self.START = 1
self.END = 20
def get_page(self, offset):
params = {
'offset': offset,
'format': 'json',
'keyword': '妹子',
'autoload': 'true',
'count': '20',
'cur_tab': '1',
'from': 'search_tab'
}
url = self.base_url + urlencode(params)
try:
response = requests.get(url)
if(response.status_code == 200):
return response.json()
except requests.ConnectionError:
return None
def get_images(self, json):
if json.get('data'):
for item in json.get('data'):
if item:
title = item.get('title')
images = item.get('image_list')
if images:
for image in images:
yield {
'image': image.get('url'),
'title': title
}
else:
continue
def save_images(self, item):
if not os.path.exists(item.get('title')):
os.mkdir(item.get('title'))
try:
local_image_url = item.get('image')
new_image_url = local_image_url.replace('list', 'large')
response = requests.get('http:'+new_image_url)
if response.status_code == 200:
file_path = '{0}/{1}.{2}'.format(item.get('title'), md5(response.content).hexdigest(), '.jpg')
if not os.path.exists(file_path):
with open(file_path, 'wb') as f:
f.write(response.content)
else:
print('Already downloaded', file_path)
except requests.ConnectionError:
print ('Faild to download')
def buildOffset(self):
return ([x * 20 for x in range(self.START, self.END + 1)])
def CrawlMain(self, offset):
json = self.get_page(offset)
for item in self.get_images(json):
print (item)
self.save_images(item)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
class seleniumLearn:
def __init__(self):
self.brower = webdriver.Firefox()
self.url = 'http://www.baidu.com'
def sele(self):
try:
self.brower.get(self.url)
input = self.brower.find_element_by_id('kw')
input.send_keys('Python')
input.send_keys(Keys.ENTER)
wait = WebDriverWait(self.brower, 10)
wait.until(EC.presence_of_element_located((By.ID, 'content_left')))
print(self.brower.current_url)
print (self.brower.get_cookies())
#8print (self.brower.page_source)
finally:
self.brower.close()
from selenium import webdriver
from selenium.webdriver import ActionChains
import time
class dragTest:
def __init__(self):
self.brower = webdriver.Firefox()
self.url = 'http://www.runoob.com/try/try.php?filename=jqueryui-api-droppable'
def dragToTarget(self):
self.brower.get(self.url)
self.brower.switch_to.frame('iframeResult')
source = self.brower.find_element_by_css_selector('#draggable')
target = self.brower.find_element_by_css_selector('#droppable')
actions = ActionChains(self.brower)
actions.drag_and_drop(source, target)
actions.perform()
def ManageWindow(self):
self.brower.get('http://www.baidu.com')
self.brower.execute_script('window.open()')
print(self.brower.window_handles)
self.brower.switch_to_window(self.brower.window_handles[1])
self.brower.get('http://www.taobao.com')
time.sleep(1)
self.brower.switch_to_window(self.brower.window_handles[0])
self.brower.get('https://python.org')
import requests
from urllib.parse import quote
import re
class splash_Test:
def __init__(self):
self.lua = '''
function main(splash, args)
local treat = require("treat")
local response = splash:http_get("http://httpbin.org/get")
return treat.as_string(response.body)
end
'''
self.url = 'http://10.251.230.23:8060/execute?lua_source=' + quote(self.lua)
def main(self):
response = requests.get(self.url, auth=('admin', '961213'))
ip = re.search('(\d+\.\d+\.\d+\.\d)', response.text).group(1)
print (ip)
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from urllib.parse import quote
from pyquery import PyQuery as pq
import json
class CrawlTaobao:
def __init__(self):
self.brower = webdriver.Firefox()
self.wait = WebDriverWait(self.brower, 10)
self.KEYWORD = 'iPad'
self.MAX_PAGE = 5
def index_page(self, page):
print ("正在抓取第 ", page, "页")
try:
url = 'http://s.taobao.com/search?q=' + quote(self.KEYWORD)
self.brower.get(url)
if(page > 1):
input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '#mainsrp-pager div.form > input')))
submit = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, '#mainsrp-pager div.form > span.btn.J_Submit')))
input.clear()
input.send_keys(page)
submit.click()
self.wait.until(EC.text_to_be_present_in_element((By.CSS_SELECTOR, '#mainsrp-pager li.item.active > span'), str(page)))
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, '.m-itemlist .items .item')))
self.get_products()
except TimeoutException:
index_page(page)
def get_products(self):
html = self.brower.page_source
doc = pq(html)
items = doc('#mainsrp-itemlist .items .item').items()
for item in items:
product = {
'image': item.find('.pic .img').attr('data-src'),
'price': item.find('.price').text(),
'deal': item.find('.deal-cnt').text(),
'title': item.find('.title').text(),
'shop': item.find('.shop').text(),
'location': item.find('.location').text()
}
print (product)
self.save_to_file(product)
def save_to_file(self, product):
try:
with open('taobao.txt', 'a', encoding='utf-8') as f:
#json.dump(product, f)
pro = json.dumps(product)
f.write(pro)
f.write('\n' + '='*50 + '\n')
f.close()
print("存储成功")
except Exception:
print ("存储失败")
def main(self):
for i in range(1, self.MAX_PAGE + 1):
self.index_page(i)
import pytesseract
from PIL import Image
class tesserocrLearn:
def __init__(self):
pass
def learn(self):
image = Image.open('code.jpg')
result = tesserocr.image_to_text(image)
print (result)
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
PHONE = ''#账号
PASS = '' #密码
class SlideConfig:
def __init__(self):
self.url = 'https://login.flyme.cn'
self.brower = webdriver.Firefox()
self.wait = WebDriverWait(self.brower, 20)
self.phone = PHONE
self.password = PASS
def get_geetest_button(self):
self.brower.get(self.url)
button = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME, 'geetest_radar_tip')))
return button
if __name__ == '__main__':
'''Tou = crawlToutiao()
groups = Tou.buildOffset()
pool = Pool()
pool.map(Tou.CrawlMain, groups)
pool.close()
pool.join()'''
#=========================
'''Se = seleniumLearn()
Se.sele()'''
#=========================
'''Drag = dragTest()
Drag.ManageWindow()'''
'''Splash = splash_Test()
Splash.main()'''
#=========================
'''Tao = CrawlTaobao()
Tao.main()'''
#=========================
'''Tes = tesserocrLearn()
Tes.learn()'''
#=========================
image = Image.open('photo.jpg')
print (image)
result = pytesseract.image_to_string(image,lang='chi_sim')
print ("++++",result)
'''slide = SlideConfig()
button = slide.get_geetest_button()
button.click()'''
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )