标签:合法性 rate 网络 word border rev 复杂 ase 作业
利用基本的Socket 通信,模仿远程cmd命令:
Server
import socket
sk = socket.socket()
sk.bind((‘127.0.0.1‘,8090))
sk.listen()
conn,addr = sk.accept()
while True:
cmd = input(‘cmd : ‘)
if cmd == ‘q‘:
conn.send(cmd.encode(‘utf-8‘))
break
conn.send(cmd.encode(‘utf-8‘))
print(‘stdout : ‘,conn.recv(1024).decode(‘gbk‘))
conn.close()
sk.close()
Client
import socket # 内置模块 和os模块的功能有相似之处 能执行操作系统的命令的功能
import subprocess
sk = socket.socket()
sk.connect((‘127.0.0.1‘,8090))
while True:
cmd = sk.recv(1024).decode(‘utf-8‘)
if cmd == ‘q‘: break
res = subprocess.Popen(cmd,shell=True, # 表示要执行的是一条系统命令
stdout=subprocess.PIPE, # 存储执行结果的正常信息
stderr=subprocess.PIPE) # 存储执行结果的错误信息
sk.send(res.stdout.read())
sk.send(res.stderr.read())
sk.close()
基本的UDP :
import socket
sk = socket.socket(type=socket.SOCK_DGRAM)
sk.bind((‘127.0.0.1‘,8090))
msg,addr = sk.recvfrom(1024)
while True:
cmd = input(‘cmd : ‘)
if cmd == ‘q‘:
sk.sendto(cmd.encode(‘utf-8‘),addr)
break
sk.sendto(cmd.encode(‘utf-8‘),addr)
print(‘stdout : ‘,sk.recvfrom(2048)[0].decode(‘gbk‘))
print(‘stderr : ‘,sk.recvfrom(2048)[0].decode(‘gbk‘))
sk.close()
import socket
import subprocess
sk = socket.socket(type=socket.SOCK_DGRAM)
sk.sendto(b‘111‘,(‘127.0.0.1‘,8090))
while True:
cmd = sk.recvfrom(1024)[0].decode(‘utf-8‘)
if cmd == ‘q‘: break
res = subprocess.Popen(cmd,shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
sk.sendto(res.stdout.read()*100,(‘127.0.0.1‘,8090))
sk.sendto(res.stderr.read(),(‘127.0.0.1‘,8090))
sk.close()
粘包及简单解决方法:
使用struct模块来转换数据长度。
server:
import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
while True:
conn,addr=phone.accept()
while True:
cmd=conn.recv(1024)
if not cmd:break
print(‘cmd: %s‘ %cmd)
res=subprocess.Popen(cmd.decode(‘utf-8‘),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
err=res.stderr.read()
print(err)
if err:
back_msg=err
else:
back_msg=res.stdout.read()
headers={‘data_size‘:len(back_msg)}
print(headers)
head_json=json.dumps(headers)
print(head_json)
head_json_bytes=bytes(head_json,encoding=‘utf-8‘)
print(head_json_bytes)
head = struct.pack(‘i‘,len(head_json_bytes))
print(struct.unpack(‘i‘,head)[0])
conn.send(head) # 先发报头的长度
conn.send(head_json_bytes) # 再发报头
conn.sendall(back_msg) # 在发真实的内容
conn.close()
# 服务端:定制稍微复杂一点的报头
client
from socket import *
import struct,json
ip_port=(‘127.0.0.1‘,8080)
client=socket(AF_INET,SOCK_STREAM)
client.connect(ip_port)
while True:
cmd=input(‘>>: ‘)
if not cmd:continue
client.send(bytes(cmd,encoding=‘utf-8‘))
head=client.recv(4)
head_json_len=struct.unpack(‘i‘,head)[0]
head_json=json.loads(client.recv(head_json_len).decode(‘utf-8‘))
data_len=head_json[‘data_size‘]
recv_size=0
recv_data=b‘‘
while recv_size < data_len:
recv_data+=client.recv(1024)
recv_size+=len(recv_data)
print(recv_size)
# print(recv_data.decode(‘utf-8‘))
print(recv_data.decode(‘gbk‘)) #windows默认gbk编码
# 客户端
练习:up_down server:
import json
import socket
sk = socket.socket()
sk.bind((‘127.0.0.1‘,8080))
sk.listen()
conn,addr = sk.accept()
content = conn.recv(1024).decode(‘utf-8‘)
content_dic = json.loads(content)
if content_dic[‘operate‘] == ‘upload‘:
conn.send(b‘received!‘)
with open(content_dic[‘filename‘],‘wb‘) as f:
while content_dic[‘filesize‘]:
file = conn.recv(1024)
f.write(file)
content_dic[‘filesize‘] -= len(file)
conn.close()
sk.close()
client:
import os
import json
import socket
sk = socket.socket()
sk.connect((‘127.0.0.1‘,8080))
def get_filename(file_path):
filename = os.path.basename(file_path)
return filename
#选择 操作
operate = [‘upload‘,‘download‘]
for num,opt in enumerate(operate,1):
print(num,opt)
num = int(input(‘请输入您要做的操作序号 : ‘))
if num == 1:
‘‘‘上传操作‘‘‘
#file_path = ‘E:\python10\day33\作业.py‘
file_path = input(‘请输入要上传的文件路径 : ‘)
# 告诉对方要上传的文件的名字
file_name = get_filename(file_path)
# 读要上传的文件 存成字符串
with open(file_path,encoding=‘utf-8‘) as f:
content = f.read()
dic = {‘operate‘:‘upload‘,‘filename‘:file_name,‘content‘:content}
# 将字符串send给server端
str_dic = json.dumps(dic)
sk.send(str_dic.encode(‘utf-8‘))
# server端接收 bytes转码程字符串
# server端打开文件 写文件
elif num == 2:
‘‘‘下载操作‘‘‘
pass
sk.close()
详细教程参考:http://www.cnblogs.com/Eva-J/articles/8244551.html
subprocess 模块 示例:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import subprocessimport os# ret = os.popen() # 拿到的结果和错误会连在一起。# 用法示例res = subprocess.Popen(‘dir‘, shell=True, stdout=subprocess.PIPE, # 将stdout/stderr 都装入容器 stderr=subprocess.PIPE)print(res.stdout.read().decode(‘gbk‘)) # 可以分开取得结果 和 错误信息print(res.stderr.read().decode(‘gbk‘)) |
hmac 模块:
server 在服务端完成验证
import os
import socket
import hmac
secret_key=b‘egg‘
sk = socket.socket()
sk.bind((‘127.0.0.1‘,8080))
sk.listen()
def check_conn(conn):
msg = os.urandom(32) # 长度32位的bytes
conn.send(msg)
h = hmac.new(secret_key,msg)
digest= h.digest() # 加salt后计算的hash结果
client_digest=conn.recv(1024)
print(client_digest)
return hmac.compare_digest(digest,client_digest)
conn,addr = sk.accept()
res = check_conn(conn)
if res:
print(‘legal‘)
conn.close()
else:
print(‘illegal‘)
conn.close()
sk.close()
client
import socket import hmac secret_key=b‘egg‘ sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) msg = sk.recv(1024) h=hmac.new(secret_key,msg) digest= h.digest() sk.send(digest) sk.close()
利用hmac验证客户端的合法性:
import os
import socket
import hmac
def check_client(conn):
secret_key = b‘egg‘ # 密钥
send_str = os.urandom(32)
conn.send(send_str)
hmac_obj = hmac.new(secret_key,send_str)
secret_ret = hmac_obj.digest() # bytes类型
if conn.recv(1024) == secret_ret:
print(‘合法的客户端‘)
return True
else:
print(‘非法的客户端‘)
return False
sk = socket.socket()
sk.bind((‘127.0.0.1‘,8090))
sk.listen()
conn,addr = sk.accept()
ret = check_client(conn)
while ret:
inp = input(‘>>>‘)
conn.send(inp.encode(‘utf-8‘))
msg = conn.recv(1024)
print(msg.decode(‘utf-8‘))
conn.close()
sk.close()
import socket
import hmac
sk = socket.socket()
sk.connect((‘127.0.0.1‘,8090))
recv = sk.recv(1024)
# 用和server端相同的手法对这个字符串进行摘要
secret_key = b‘egg‘ # 密钥
hmac_obj = hmac.new(secret_key,recv)
ret = hmac_obj.digest()
sk.send(ret)
msg = sk.recv(1024)
if msg:
print(msg.decode(‘utf-8‘))
while True:
inp = input(‘>>>‘)
sk.send(inp.encode(‘utf-8‘))
msg = sk.recv(1024)
print(msg.decode(‘utf-8‘))
sk.close()
socketserver 模块:
server
import socketserver
# 类名随意,但必须继承 BaseRequestHandler
class MyServer(socketserver.BaseRequestHandler):
def handle(self): # 固定的方法
self.request.send(b‘hello‘) # conn
msg = self.request.recv(1024).decode(‘utf-8‘)
print(msg)
server = socketserver.ThreadingTCPServer( # 多线程实现并发
(‘127.0.0.1‘,9000),
MyServer) # 传入写的类
server.serve_forever() # 永久服务
client
# tcp
# 粘包
# 在同一时间只能处理一个客户端的请求
import socket
sk = socket.socket()
sk.connect((‘127.0.0.1‘,9000))
print(sk.recv(1024))
msg = input(‘>>>‘).encode(‘utf-8‘)
sk.send(msg)
sk.close()
login server
import json
import hashlib
import socketserver
def md5_pwd(user,pwd):
md5_obj = hashlib.md5(user.encode(‘utf-8‘))
md5_obj.update(pwd.encode(‘utf-8‘))
ret = md5_obj.hexdigest()
return ret
def login(userinfo):
user_dic = json.loads(userinfo)
passwd = md5_pwd(user_dic[‘username‘], user_dic[‘passwd‘])
with open(‘userinfo‘) as f:
for line in f:
user, pwd = line.split(‘|‘)
if user_dic[‘username‘] == user and passwd == pwd:
print(‘登录成功‘)
break
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
userinfo = self.request.recv(1024).decode(‘utf-8‘)
login(userinfo)
server = socketserver.ThreadingTCPServer(
(‘127.0.0.1‘,9000),
MyServer)
server.serve_forever()
login client
import json
import socket
ADDR = (‘127.0.0.1‘,9000)
def get_socket():
sk = socket.socket()
sk.connect(ADDR)
return sk
# 输入账号
username = input(‘username >>>‘)
passwd = input(‘password >>>‘)
if username.strip() and passwd.strip():
sk = get_socket()
dic = {‘username‘:username,‘passwd‘:passwd}
str_dic = json.dumps(dic)
sk.send(str_dic.encode(‘utf-8‘))
sk.close()
采用进程池方式 启用的多进程 socket server
import socket
from multiprocessing import Pool
def func(conn):
conn.send(b‘gooooood‘)
print(conn.recv(1024).decode(‘utf8‘))
conn.close()
if __name__ == ‘__main__‘:
p = Pool(5)
sk = socket.socket()
sk.bind((‘127.0.0.1‘,8081))
sk.listen()
while 1:
conn,addr = sk.accept()
p.apply_async(func,args=(conn,)) # 异步方式
sk.close()
import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8081)) ret = sk.recv(1024).decode(‘utf8‘) print(ret) msg = input(‘>>>‘).encode(‘utf8‘) sk.send(msg) sk.close() # 客户端用于测试
标签:合法性 rate 网络 word border rev 复杂 ase 作业
原文地址:https://www.cnblogs.com/FHBIAO/p/10174528.html