python语言磁力搜索引擎源码公开,基于DHT协议

原创
小哥 2年前 (2023-05-22) 阅读数 32 #大杂烩

版权声明:本文是博主原创文章,后续 CC 4.0 BY-SA 版权协议,请附上原始来源链接和本声明转载。
链接到本文:https://blog.csdn.net/qq\_34844199/article/details/51558175
此前,我在写百度网盘爬虫和百度图片爬虫的时候,答应网友会花时间ok搜索的源代码是公开的,现在是时候兑现我们的承诺了。以下是爬虫的所有代码,完全公开。即使您知道如何编写程序也可以使用它,但请先安装它linux系统,具有公网条件,然后运行:

python startCrawler.py

有必要提醒您,数据库中有所有域代码。请自己创建一个表。这太简单了,就不多说了。同时,我也提供了下载地址,源代码位于:下载地址1 下载地址2

!/usr/bin/env python

encoding: utf-8

"""
author:haoning
create time:2015.8.1
"""
import hashlib
import os
import time
import datetime
import traceback
import sys
import random
import json
import socket
import threading
from hashlib import sha1 #进行hash加密
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
from collections import deque
from Queue import Queue

import MySQLdb as mdb  #数据库连接器

import metautils
import downloadTorrent
from bencode import bencode, bdecode
import pygeoip

DB_HOST = 127.0.0.1
DB_USER = root
DB_PASS = root

BOOTSTRAP_NODES = (
("67.215.246.10", 6881),
("82.221.103.244", 6881),
("23.21.224.150", 6881)
)
RATE = 1 #调控速率
TID_LENGTH = 2
RE_JOIN_DHT_INTERVAL = 3
TOKEN_LENGTH = 2
INFO_HASH_LEN = 500000 #50w数据非常小,限制内存消耗不会太大
CACHE_LEN = 100 #更新数据库缓存
WAIT_DOWNLOAD = 80

geoip = pygeoip.GeoIP(GeoIP.dat)

def is_ip_allowed(ip):
country = geoip.country_code_by_addr(ip)
if country in (CN,TW,JP,HK, KR):
return True
return False

def entropy(length):
return "".join(chr(randint(0, 255)) for _ in xrange(length))

def random_id():
h = sha1()
h.update(entropy(20))
return h.digest()

def decode_nodes(nodes):
n = []
length = len(nodes)
if (length % 26) != 0:
return n

for i in range(0, length, 26):
nid = nodes[i:i+20]
ip = inet_ntoa(nodes[i+20:i+24])
port = unpack("!H", nodes[i+24:i+26])[0]
n.append((nid, ip, port))

return n

def timer(t, f):
Timer(t, f).start()

def get_neighbor(target, nid, end=10):
return target[:end]+nid[end:]

class KNode(object):

def __init__(self, nid, ip, port):
self.nid = nid
self.ip = ip
self.port = port

class DHTClient(Thread):

def __init__(self, max_node_qsize):
Thread.__init__(self)
self.setDaemon(True)
self.max_node_qsize = max_node_qsize
self.nid = random_id()
self.nodes = deque(maxlen=max_node_qsize)

def send_krpc(self, msg, address):
try:
self.ufd.sendto(bencode(msg), address)
except Exception:
pass

def send_find_node(self, address, nid=None):
nid = get_neighbor(nid, self.nid) if nid else self.nid
tid = entropy(TID_LENGTH)
msg = {
"t": tid,
"y": "q",
"q": "find_node",
"a": {
"id": nid,
"target": random_id()
}
}
self.send_krpc(msg, address)

def join_DHT(self):
for address in BOOTSTRAP_NODES:
self.send_find_node(address)

def re_join_DHT(self):
if len(self.nodes) == 0:
self.join_DHT()
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)

def auto_send_find_node(self):
wait = 1.0 / self.max_node_qsize
while True:
try:
node = self.nodes.popleft()
self.send_find_node((node.ip, node.port), node.nid)
except IndexError:
pass
try:
sleep(wait)
except KeyboardInterrupt:
os._exit(0)

def process_find_node_response(self, msg, address):
nodes = decode_nodes(msg["r"]["nodes"])
for node in nodes:
(nid, ip, port) = node
if len(nid) != 20: continue
if ip == self.bind_ip: continue
n = KNode(nid, ip, port)
self.nodes.append(n)

class DHTServer(DHTClient): #获得info_hash

def __init__(self, master, bind_ip, bind_port, max_node_qsize):
DHTClient.__init__(self, max_node_qsize)

self.master = master
self.bind_ip = bind_ip
self.bind_port = bind_port
self.speed=0

self.process_request_actions = {
"get_peers": self.on_get_peers_request,
"announce_peer": self.on_announce_peer_request,
}

self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ufd.bind((self.bind_ip, self.bind_port))

timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)

def run(self):
self.re_join_DHT()
while True:
try:
(data, address) = self.ufd.recvfrom(65536)
msg = bdecode(data)
self.on_message(msg, address)
except Exception:
pass

def on_message(self, msg, address):
global RATE #设置为全局数量
try:
if msg["y"] == "r":
if msg["r"].has_key("nodes"):
self.process_find_node_response(msg, address) #发现节点
elif msg["y"] == "q":
try:
self.speed+=1
if self.speed % 10000 ==0:
RATE=random.randint(1,3)
if RATE==2:
RATE=1
if RATE==3:
RATE=10
if self.speed>100000:
self.speed=0
if self.speed % RATE==0: #数据过多,占用cpu太多,分为限速,1,1,10
self.process_request_actions[msg["q"]](msg, address) #处理来自其他节点的请求,此过程获取info_hash

self.process_request_actions[msg["q"]](msg, address) #处理来自其他节点的请求,此过程获取info_hash

except KeyError:
self.play_dead(msg, address)
except KeyError:
pass

def on_get_peers_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
tid = msg["t"]
nid = msg["a"]["id"]
token = infohash[:TOKEN_LENGTH]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(infohash, self.nid),
"nodes": "",
"token": token
}
}
self.master.log(infohash, address)
self.send_krpc(msg, address)
except KeyError:
pass

def on_announce_peer_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
token = msg["a"]["token"]
nid = msg["a"]["id"]
tid = msg["t"]

if infohash[:TOKEN_LENGTH] == token:
if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
port = address[1]
else:
port = msg["a"]["port"]
self.master.log_announce(infohash, (address[0], port))
except Exception:
print error
pass
finally:
self.ok(msg, address)

def play_dead(self, msg, address):
try:
tid = msg["t"]
msg = {
"t": tid,
"y": "e",
"e": [202, "Server Error"]
}
self.send_krpc(msg, address)
except KeyError:
pass

def ok(self, msg, address):
try:
tid = msg["t"]
nid = msg["a"]["id"]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(nid, self.nid)
}
}
self.send_krpc(msg, address)
except KeyError:
pass

class Master(Thread): #解析info_hash

def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.queue = Queue()
self.cache = Queue()
self.count=0
self.mutex = threading.RLock() #可以重新输入锁,允许单个线程再次获取获取的锁?
self.waitDownload = Queue()
self.metadata_queue = Queue()
self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, oksousou, charset=utf8)
self.dbconn.autocommit(False)
self.dbcurr = self.dbconn.cursor()
self.dbcurr.execute(SET NAMES utf8)
self.visited = set()

def lock(self): #加锁
self.mutex.acquire()

def unlock(self): #解锁
self.mutex.release()

def work(self,item):

print "start thread",item
while True:
self.prepare_download_metadata()
self.lock()
self.download_metadata()
self.unlock()

self.lock()
self.got_torrent()
self.unlock()

def start_work(self,max):

for item in xrange(max):
t = threading.Thread(target=self.work, args=(item,))
t.setDaemon(True)
t.start()

加入团队时种子效率更高

def log_announce(self, binhash, address=None):
if self.queue.qsize() < INFO_HASH_LEN : #大于INFO_HASH_LEN不要加入团队,否则我们以后就没有时间处理了
if is_ip_allowed(address[0]):
self.queue.put([address, binhash]) #获得info_hash

def log(self, infohash, address=None):
if self.queue.qsize() < INFO_HASH_LEN: #大于INFO_HASH_LEN/2不要加入团队,否则我们以后就没有时间处理了
if is_ip_allowed(address[0]):
self.queue.put([address, infohash])

def prepare_download_metadata(self):

if self.queue.qsize() == 0:
sleep(2)

从queue中获得info_hash用来下载

address, binhash= self.queue.get()
if binhash in self.visited:
return
if len(self.visited) > 100000: #大于100000重置队列,我想我已经去过了
self.visited = set()
self.visited.add(binhash)

关注已经访问过的新内容info_hash

info_hash = binhash.encode(hex)
utcnow = datetime.datetime.utcnow()

self.cache.put((address,binhash,utcnow)) #加载缓存队列

def download_metadata(self):

if self.cache.qsize() > CACHE_LEN/2: #出站更新下载
while self.cache.qsize() > 0: #排空队列
address,binhash,utcnow = self.cache.get()
info_hash = binhash.encode(hex)
self.dbcurr.execute(SELECT id FROM search_hash WHERE info_hash=%s, (info_hash,))
y = self.dbcurr.fetchone()
if y:

更新最近的发现时间、请求数

self.dbcurr.execute(UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s, (utcnow, info_hash))
else:
self.waitDownload.put((address, binhash))
self.dbconn.commit()
if self.waitDownload.qsize() > WAIT_DOWNLOAD:
while self.waitDownload.qsize() > 0:
address,binhash = self.waitDownload.get()
t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()

def decode(self, s):
if type(s) is list:
s = ;.join(s)
u = s
for x in (self.encoding, utf8, gbk, big5):
try:
u = s.decode(x)
return u
except:
pass
return s.decode(self.encoding, ignore)

def decode_utf8(self, d, i):
if i+.utf-8 in d:
return d[i+.utf-8].decode(utf8)
return self.decode(d[i])

def parse_metadata(self, data): #解析种子
info = {}
self.encoding = utf8
try:
torrent = bdecode(data) #编码后解析
if not torrent.get(name):
return None
except:
return None
detail = torrent
info[name] = self.decode_utf8(detail, name)
if files in detail:
info[files] = []
for x in detail[files]:
if path.utf-8 in x:
v = {path: self.decode(/.join(x[path.utf-8])), length: x[length]}
else:
v = {path: self.decode(/.join(x[path])), length: x[length]}
if filehash in x:
v[filehash] = x[filehash].encode(hex)
info[files].append(v)
info[length] = sum([x[length] for x in info[files]])
else:
info[length] = detail[length]
info[data_hash] = hashlib.md5(detail[pieces]).hexdigest()
return info

def got_torrent(self):
if self.metadata_queue.qsize() == 0:
return
binhash, address, data,start_time = self.metadata_queue.get()
if not data:
return
try:
info = self.parse_metadata(data)
if not info:
return
except:
traceback.print_exc()
return

temp = time.time()
x = time.localtime(float(temp))
utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now

info_hash = binhash.encode(hex) #磁力
info[info_hash] = info_hash

need to build tags

info[tagged] = False
info[classified] = False
info[requests] = 1
info[last_seen] = utcnow
info[create_time] = utcnow
info[source_ip] = address[0]

if info.get(files):
files = [z for z in info[files] if not z[path].startswith(_)]
if not files:
files = info[files]
else:
files = [{path: info[name], length: info[length]}]
files.sort(key=lambda z:z[length], reverse=True)
bigfname = files[0][path]
info[extension] = metautils.get_extension(bigfname).lower()
info[category] = metautils.get_category(info[extension])

try:
try:
print \n, Saved, info[info_hash], info[name], (time.time()-start_time), s, address[0]
except:
print \n, Saved, info[info_hash]
ret = self.dbcurr.execute(INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged, +
length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s),
(info[info_hash], info[category], info[data_hash], info[name], info[extension], info[classified],
info[source_ip], info[tagged], info[length], info[create_time], info[last_seen], info[requests]))
if self.count %50 ==0:
self.dbconn.commit()
if self.count>100000:
self.count=0
except:
print self.name, save error, self.name, info
traceback.print_exc()
return

if __name__ == "__main__":

启动客户端

master = Master()
master.start_work(150)

启动服务器

dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
dht.start()
dht.auto_send_find_node()

请注意,上面有一段代码需要下载种子,因此以下代码段非常重要:

!/usr/bin/env python

encoding: utf-8

"""
author:haoning
create time:2015.8.1
"""
from hashlib import sha1
import math
from socket import inet_ntoa
import socket
from struct import pack, unpack
from threading import Timer, Thread
from time import sleep, time

from bencode import bencode, bdecode
from startCrawler import entropy

BT_PROTOCOL = "BitTorrent protocol"
BT_MSG_ID = 20
EXT_HANDSHAKE_ID = 0

def random_id():
hash = sha1()
hash.update(entropy(20))
return hash.digest()

def send_packet(the_socket, msg):
the_socket.send(msg)

def send_message(the_socket, msg):
msg_len = pack(">I", len(msg))
send_packet(the_socket, msg_len + msg)

def send_handshake(the_socket, infohash):
bt_header = chr(len(BT_PROTOCOL)) + BT_PROTOCOL
ext_bytes = "\x00\x00\x00\x00\x00\x10\x00\x00"
peer_id = random_id()
packet = bt_header + ext_bytes + infohash + peer_id

send_packet(the_socket, packet)

def check_handshake(packet, self_infohash):
try:
bt_header_len, packet = ord(packet[:1]), packet[1:]
if bt_header_len != len(BT_PROTOCOL):
return False
except TypeError:
return False

bt_header, packet = packet[:bt_header_len], packet[bt_header_len:]
if bt_header != BT_PROTOCOL:
return False

packet = packet[8:]
infohash = packet[:20]
if infohash != self_infohash:
return False

return True

def send_ext_handshake(the_socket):
msg = chr(BT_MSG_ID) + chr(EXT_HANDSHAKE_ID) + bencode({"m":{"ut_metadata": 1}})
send_message(the_socket, msg)

def request_metadata(the_socket, ut_metadata, piece):
"""bep_0009"""
msg = chr(BT_MSG_ID) + chr(ut_metadata) + bencode({"msg_type": 0, "piece": piece})
send_message(the_socket, msg)

def get_ut_metadata(data):
ut_metadata = "_metadata"
index = data.index(ut_metadata)+len(ut_metadata) + 1
return int(data[index])

def get_metadata_size(data):
metadata_size = "metadata_size"
start = data.index(metadata_size) + len(metadata_size) + 1
data = data[start:]
return int(data[:data.index("e")])

def recvall(the_socket, timeout=5):
the_socket.setblocking(0)
total_data = []
data = ""
begin = time()

while True:
sleep(0.05)
if total_data and time()-begin > timeout:
break
elif time()-begin > timeout*2:
break
try:
data = the_socket.recv(1024)
if data:
total_data.append(data)
begin = time()
except Exception:
pass
return "".join(total_data)

def download_metadata(address, infohash, metadata_queue, timeout=5):
metadata = None
start_time = time()
the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
the_socket.settimeout(timeout)
the_socket.connect(address)

handshake

send_handshake(the_socket, infohash)
packet = the_socket.recv(4096)

handshake error

if not check_handshake(packet, infohash):
return

ext handshake

send_ext_handshake(the_socket)
packet = the_socket.recv(4096)

get ut_metadata and metadata_size

ut_metadata, metadata_size = get_ut_metadata(packet), get_metadata_size(packet)

request each piece of metadata

metadata = []
for piece in range(int(math.ceil(metadata_size/(16.01024)))): #piece这是一个控制块,根据控制块下载数据
request_metadata(the_socket, ut_metadata, piece)
packet = recvall(the_socket, timeout) #the_socket.recv(1024
17)
metadata.append(packet[packet.index("ee")+2:])
metadata = "".join(metadata)

except socket.timeout:
pass
except Exception, e:
pass
finally:

print "metadata= %s" %(metadata)

the_socket.close() #返回前确保所有内容都已关闭socket
if metadata != None: #只允许非空的种子进入?
metadata_queue.put((infohash, address, metadata,start_time))

实际上,还有另一种方法可以通过libtorrent但这太贵了cpu所以我通常不使用他,如下所示:

coding: utf8

import threading
import traceback
import random
import time
import os
import socket

import libtorrent as lt

threading.stack_size(200*1024)
socket.setdefaulttimeout(30)

def fetch_torrent(session, ih, timeout):
name = ih.upper()
url = magnet:?xt=urn:btih:%s % (name,)
data =
params = {
save_path: /tmp/downloads/,
storage_mode: lt.storage_mode_t(2),
paused: False,
auto_managed: False,
duplicate_is_error: True}
try:
handle = lt.add_magnet_uri(session, url, params)
except:
return None
status = session.status()
handle.set_sequential_download(1)
meta = None
down_time = time.time()
down_path = None
for i in xrange(0, timeout):
if handle.has_metadata():
info = handle.get_torrent_info()
down_path = /tmp/downloads/%s % info.name()

print status, p, status.num_peers, g, status.dht_global_nodes, ts, status.dht_torrents, u, status.total_upload, d, status.total_download

meta = info.metadata()
break
time.sleep(1)
if down_path and os.path.exists(down_path):
os.system(rm -rf "%s" % down_path)
session.remove_torrent(handle)
return meta

def download_metadata(address, binhash, metadata_queue, timeout=20):
metadata = None
start_time = time.time()
try:
session = lt.session()
r = random.randrange(10000, 50000)
session.listen_on(r, r+10)
session.add_dht_router(router.bittorrent.com,6881)
session.add_dht_router(router.utorrent.com,6881)
session.add_dht_router(dht.transmission.com,6881)
session.add_dht_router(127.0.0.1,6881)
session.start_dht()
metadata = fetch_torrent(session, binhash.encode(hex), timeout)
session = None
except:
traceback.print_exc()
finally:
metadata_queue.put((binhash, address, metadata,start_time))
这个爬虫仍然消耗了我和其他在线专家的大量时间。请保持研究和开源的精神,多交流,分享给看过本博客的朋友。我已经建立了一个qq作为去转盘网站的官方团,现在人数不是很多。如果您有兴趣,请来看看。多个粉丝可以去转盘,玩得更开心,qq群号:512245829
————————————————
版权声明:本文是CSDN博主「qq_34844199原文,如下 CC 4.0 BY-SA 版权协议,请附上原始来源链接和本声明转载。
原文链接:https://blog.csdn.net/qq\_34844199/article/details/51558175

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门