Commit 059c3a22 authored by 林禹's avatar 林禹

2021-6-15: 第一个爬虫作业

parent abee3fd9
......@@ -135,4 +135,20 @@ dmypy.json
.pytype/
# Cython debug symbols
cython_debug/
\ No newline at end of file
cython_debug/
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Dependency directories (remove the comment below to include it)
# vendor/
\ No newline at end of file
#file: ezpymysql.py
#Author: veelion
"""A lightweight wrapper around PyMySQL.
only for python3
"""
import time
import logging
import traceback
import pymysql
import pymysql.cursors
version = "0.7"
version_info = (0, 7, 0, 0)
class Connection(object):
"""A lightweight wrapper around PyMySQL.
"""
def __init__(self, host, database, user=None, password=None,
port=0,
max_idle_time=7 * 3600, connect_timeout=10,
time_zone="+0:00", charset = "utf8mb4", sql_mode="TRADITIONAL"):
self.host = host
self.database = database
self.max_idle_time = float(max_idle_time)
args = dict(use_unicode=True, charset=charset,
database=database,
init_command=('SET time_zone = "%s"' % time_zone),
cursorclass=pymysql.cursors.DictCursor,
connect_timeout=connect_timeout, sql_mode=sql_mode)
if user is not None:
args["user"] = user
if password is not None:
args["passwd"] = password
# We accept a path to a MySQL socket file or a host(:port) string
if "/" in host:
args["unix_socket"] = host
else:
self.socket = None
pair = host.split(":")
if len(pair) == 2:
args["host"] = pair[0]
args["port"] = int(pair[1])
else:
args["host"] = host
args["port"] = 3306
if port:
args['port'] = port
self._db = None
self._db_args = args
self._last_use_time = time.time()
try:
self.reconnect()
except Exception:
logging.error("Cannot connect to MySQL on %s", self.host,
exc_info=True)
def _ensure_connected(self):
# Mysql by default closes client connections that are idle for
# 8 hours, but the client library does not report this fact until
# you try to perform a query and it fails. Protect against this
# case by preemptively closing and reopening the connection
# if it has been idle for too long (7 hours by default).
if (self._db is None or
(time.time() - self._last_use_time > self.max_idle_time)):
self.reconnect()
self._last_use_time = time.time()
def _cursor(self):
self._ensure_connected()
return self._db.cursor()
def __del__(self):
self.close()
def close(self):
"""Closes this database connection."""
if getattr(self, "_db", None) is not None:
self._db.close()
self._db = None
def reconnect(self):
"""Closes the existing database connection and re-opens it."""
self.close()
self._db = pymysql.connect(**self._db_args)
self._db.autocommit(True)
def query(self, query, *parameters, **kwparameters):
"""Returns a row list for the given query and parameters."""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
result = cursor.fetchall()
return result
finally:
cursor.close()
def get(self, query, *parameters, **kwparameters):
"""Returns the (singular) row returned by the given query.
"""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.fetchone()
finally:
cursor.close()
def execute(self, query, *parameters, **kwparameters):
"""Executes the given query, returning the lastrowid from the query."""
cursor = self._cursor()
try:
cursor.execute(query, kwparameters or parameters)
return cursor.lastrowid
except Exception as e:
if e.args[0] == 1062:
pass
else:
traceback.print_exc()
raise e
finally:
cursor.close()
insert = execute
## =============== high level method for table ===================
def table_has(self, table_name, field, value):
if isinstance(value, str):
value = value.encode('utf8')
sql = 'SELECT %s FROM %s WHERE %s="%s"' % (
field,
table_name,
field,
value)
d = self.get(sql)
return d
def table_insert(self, table_name, item):
'''item is a dict : key is mysql table field'''
fields = list(item.keys())
values = list(item.values())
fieldstr = ','.join(fields)
valstr = ','.join(['%s'] * len(item))
for i in range(len(values)):
if isinstance(values[i], str):
values[i] = values[i].encode('utf8')
sql = 'INSERT INTO %s (%s) VALUES(%s)' % (table_name, fieldstr, valstr)
try:
last_id = self.execute(sql, *values)
return last_id
except Exception as e:
if e.args[0] == 1062:
# just skip duplicated item
pass
else:
traceback.print_exc()
print('sql:', sql)
print('item:')
for i in range(len(fields)):
vs = str(values[i])
if len(vs) > 300:
print(fields[i], ' : ', len(vs), type(values[i]))
else:
print(fields[i], ' : ', vs, type(values[i]))
raise e
def table_update(self, table_name, updates,
field_where, value_where):
'''updates is a dict of {field_update:value_update}'''
upsets = []
values = []
for k, v in updates.items():
s = '%s=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
self.execute(sql, *(values))
if __name__ == '__main__':
db = Connection(
'localhost',
'db_name',
'user',
'password'
)
# 获取一条记录
sql = 'select * from test_table where id=%s'
data = db.get(sql, 2)
# 获取多天记录
sql = 'select * from test_table where id>%s'
data = db.query(sql, 2)
# 插入一条数据
sql = 'insert into test_table(title, url) values(%s, %s)'
last_id = db.execute(sql, 'test', 'http://a.com/')
# 或者
last_id = db.insert(sql, 'test', 'http://a.com/')
# 使用更高级的方法插入一条数据
item = {
'title': 'test',
'url': 'http://a.com/',
}
last_id = db.table_insert('test_table', item)
import re
import requests
import tldextract
from lxml import etree
from ezpymysql import Connection
def save_to_db(url, html):
# print("+++++++++++++++++++++++++++++++++++++++++++")
# print('%s : %s' % (url, len(html)))
# print("+++++++++++++++++++++++++++++++++++++++++++")
db = Connection(
'localhost',
'sinanew',
'root',
'123456'
)
item = {
'title': html,
'url': url,
}
last_id = db.table_insert('sinatilte', item)
def analysisA(htmlText):
linklist = htmlText.xpath("//a")
news_linkDics = []
for item in linklist:
if len(item.xpath("@href")) > 0 and len(item.xpath("text()")) > 0:
ADic = {item.xpath("@href")[0]: item.xpath("text()")[0]}
news_linkDics.append(ADic)
return news_linkDics
def crawl():
sina_url = "https://news.sina.com.cn/"
# 设置请求头 防封
_headers = {
'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
'Windows NT 6.1; Win64; x64; Trident/5.0)'),
}
htmlText = requests.get(sina_url, headers=_headers).text.encode(
'iso-8859-1').decode('utf-8')
DicArray = analysisA(etree.HTML(htmlText))
news_links = []
for idx, item in enumerate(DicArray):
dicKey = list(item.keys())[0]
dicValue = item[dicKey]
if not dicKey.startswith('http'):
continue
domain = tldextract.extract(dicKey)
if domain == "sina":
continue
news_links.append(DicArray[idx])
for link in news_links:
dicKey = list(link.keys())[0]
dicValue = link[dicKey]
save_to_db(dicKey, dicValue)
if __name__ == '__main__':
crawl()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment