1.什么是socketserver模块
socketserver模块是一个在TCP协议让让一个服务端和多个客户端通信的一个模块
客户端代码
代码块
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
msg_s = input('>>>')
sk.send(msg_s.encode('utf-8'))
print(sk.recv(1024).decode('utf-8'))
sk.close()
服务端代码
代码块
import socketserver #用这个模块可以多个客户端和一个服务端通信
class MySocket(socketserver.BaseRequestHandler):
def handle(self):# 这个方法的名字是固定的,必须是这个名字
# 收发的逻辑代码
# self.request == conn
msg = self.request.recv(1024).decode('utf-8')
print(msg)
self.request.send(msg.upper().encode('utf-8'))
server = socketserver.TCPServer(('127.0.0.1',8080),MySocket)# 固定的
server.serve_forever()# 开启一个永久性的服务
2.实现客户端和服务端大文件的传输
客户端代码:
代码块
import socket
import os
import json
import struct
sk = socket.socket()
sk.connect(("127.0.0.1",8001))
menu = {"1":"upload","2":"download"}
for k,v in menu.items():
print(k,v)
num = input("请输入功能选项:")
if num == "1":
dic = {"opt":menu.get(num),"filename":None,"filesize":None}
file_path = input("请输入一个绝对路径:")# 文件的绝对路径
# E:\Python S14\day32\实现大文件的传输\11.mp4
filename = os.path.basename(file_path)# 文件名字
filesize = os.path.getsize(file_path)# 获取用户输入的路径中文件的大小
dic["filename"] = filename
dic["filesize"] = filesize
str_dic = json.dumps(dic) #完成字典的序列化
len_dic = len(str_dic)# 获取到字典的长度,是一个int类型的数据 假如是46或者146,这不重要
b_len_dic = struct.pack('i',len_dic)# 把46或者146统一用一个4bytes的数据表示字典的长度
sk.send(b_len_dic + str_dic.encode("utf-8"))# 将bytes类型的字典的长度 + bytes类型的字典的内容,一起发送给服务器
with open(file_path,"rb") as f:
while filesize:
content = f.read(1024) #将每次传输大小最多控制在1024个字节
sk.send(content)
filesize -= len(content)
elif num == "2":
pass
服务端代码
代码块
import socket
import json
import struct
sk = socket.socket()
sk.bind(("127.0.0.1",8001))
sk.listen()
conn,addr = sk.accept()
b_len_dic = conn.recv(4)
len_dic = struct.unpack('i',b_len_dic)[0]# 获取到字典的真实实际长度
# unpack得到的是一个元组,要取下标为0的位置
str_dic = conn.recv(len_dic).decode('utf-8') #得到字典
# str_dic = {"opt":menu.get(num),"filename":None,"filesize":None}
dic = json.loads(str_dic) #还原字典
if dic["opt"] == "upload":
filename = "new"+ dic["filename"]
with open(filename,"ab") as f:
while dic['filesize']:
content = conn.recv(1024)
f.write(content)
dic['filesize'] -= len(content)
elif dic["opt"] == "download":
# 客户端发来一个字典要执行的功能,以及客户端自己的绝对路径
# 服务器要返回这个绝对路径中所有文件及文件夹
# 客户端自己选择进入到哪一层目录下
# 服务器都要返回对应目录下所有文件及文件夹
# 客户随时选择某一个目录下的某一个文件进行下载
# 客户端发送来一个字典,包含了要进行的操作,要下载的文件的绝对路径,
# 根据绝对路径去读取文件内容
# 一边读,一遍发
pass
conn.close()
sk.close()
3.实现切换目录的作业,用户输入一个路径,服务端返回这个路径下所有的文件,如果用户输入..,那么返回上个目录层级,如果用户输入cd,让用户输入要切换的目录,服务器切换到对应的目录并展示目录下的文件
客户端代码
代码块
import socket
import os
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
def func(msg):
sk.send(msg.encode('utf-8')) # 发送
current_dir = sk.recv(1024).decode('utf-8') #接收
print(current_dir.split('--')) # 获取接收到当前目录的列表类型数据
abs_path = input('请输入您的根目录:')
func(abs_path) #执行函数,发送和接收
while 1:
cmd = input('请输入>>>')
# cd + 文件夹 ..
if cmd == '..':
func(cmd)
if cmd == 'cd':
filename = input('请输入一个文件夹名:')
func((cmd+' '+filename))
sk.close()
服务端代码
代码块
import socket
import os
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
conn,addr = sk.accept()
def send_data(conn,path):
'''你给我一个目录,我把目录发给client'''
lis_dir = os.listdir(path) #以列表形式获取当前目录下的东西
str_dir = '--'.join(lis_dir) #将列表拼接成字符串
conn.send(str_dir.encode('utf-8')) #发送给客户端
current_dir = conn.recv(1024).decode('utf-8')# 获取用户输入的绝对路径
# 以下再处理,都要根据当前路径去处理,无论是返回上一层,还是进入下一层
send_data(conn,current_dir)# 把用户输入的路径下的所有文件及文件夹返回给客户端
#D:/泛娱乐/骑行/第二季
while 1:
cmd = conn.recv(1024).decode('utf-8')
if cmd == '..':
current_dir = current_dir.split('/')[:-1] #拿到上个目录的列表,比如['D:','泛娱乐','骑行']
current_dir = '/'.join(current_dir) #把上个目录的里列表转为字符串,比如D:/泛娱乐/骑行
send_data(conn, current_dir)
else:
filename = cmd.split(' ')[1]# 获取用户输入的文件名字
current_dir =current_dir+'/'+filename # 将文件名字添加到当前路径下,组成一个完整的新路径
if os.path.isdir(current_dir):# 如果客户输入的文件名字是一个文件夹
send_data(conn, current_dir)
else:# 如果不是一个文件夹
conn.send('您输入的不是文件夹'.encode('utf-8'))
conn.close()
sk.close()
4.利用hashlib模块或者hmac模块进行md5加密计算身份验证
代码块
import hashlib
import hmac #相当于hashlib,在hashllib基础上的封装,更简单
import os
# print(os.urandom(16),len(os.urandom(16))) #获取一个随即位数的随机数
sor = b'wusir'
r_str = os.urandom(16)
#用hmac加密
md5_obj = hmac.new(sor,r_str)
r = md5_obj.digest()
print(r)
#用hsahlib加密
md5_obj = hashlib.md5(sor)
md5_obj.update(r_str)
r1 = md5_obj.hexdigest()
5.利用hmac完成登陆验证的例子
客户端代码
代码块
import socket
import hmac
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
sor = b'alex'
r_str = sk.recv(1024) # 获得服务端16位长度的bytes
md5_obj = hmac.new(sor,r_str)
result = md5_obj.digest()
sk.send(result) #将md5加密后发给服务端
msg = sk.recv(1024)
print(msg)
服务端代码
代码块
import socket
import hashlib
import os
import hmac
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
conn,addr = sk.accept()
sor = b'alex'
r_str = os.urandom(16)# 随机出一个16位长度的bytes
conn.send(r_str)
md5_obj = hmac.new(sor,r_str)
result = md5_obj.digest()
msg = conn.recv(1024) #接收客户端md5加密后的密文
if msg == result:
conn.send(b'success')
else:
conn.send(b'failed')
conn.close()
sk.close()
6.三次登陆
客户端代码
代码块
import socket
import hashlib
import json
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
dic = {'status':False,'username':None,'password':None}
c = 3 #允许用户登陆3次
while c:
username = input('请输入用户名')
password = input('请输入密码')
md5_obj = hashlib.md5(password.encode('utf-8'))
md5_obj.update(username.encode('utf-8'))
pawd_m = md5_obj.hexdigest() #得到一个加密算法
dic['username'] = username
dic['password'] = pawd_m
str_dic = json.dumps(dic)
sk.send(str_dic.encode('utf-8')) #发送序列化后的字典
# 服务器应该回复我一个这样的字典:
# 是否登录成功,如果没有登录成功是因为什么原因?
res_dic = sk.recv(1024).decode('utf-8')# str_dic
result = json.loads(res_dic)# dic = {status:False/True , username , password, reason}
if result['status']:
print('登录成功')
break
else:
print('失败,%s'%result['reason'])
c -= 1
sk.close()
服务端代码
代码块
import socketserver
import json
import hashlib
class MySocket(socketserver.BaseRequestHandler):
def handle(self):
sor = b'wusir'#
while 1:
str_dic = self.request.recv(1024).decode('utf-8')
# 接收到 一个字典,类似于{'status':False,'username':None,'password':None}
if not str_dic:break # 当客户端登录失败退出程序的情况下,这里会接收到一个空消息。
dic = json.loads(str_dic) #还原为原来字典
if not dic['status']:
'''状态是未登陆'''
with open('info','r',encoding='utf-8') as f:
# 文件内容的存储方式 用户名|密码
for info in f:
username,pawd_txt = info.strip().split('|')
if username == dic['username']:
'''用户存在,就对客户端发来的用户的加密 密码再次加密,与文件中对比'''
md5_obj = hashlib.md5(sor)
md5_obj.update(dic['password'].encode('utf-8'))
pawd = md5_obj.hexdigest()
if pawd_txt == pawd:
'''密码正确的情况下'''
dic['status'] = True
else:
dic['reason'] = '密码错误'
break
else:
'''用户不存在'''
dic['reason'] = '用户不存在'
# dic = {status:False , username , password, reason}
# dic = {status:True , username , password}
str_dic = json.dumps(dic)
self.request.send(str_dic.encode('utf-8'))
else:
'''已经是登录成功了'''
server = socketserver.TCPServer(('127.0.0.1',8080),MySocket)
server.serve_forever() #永久开启服务,不关闭
文件的内容,文件名:info
alex|d32aa373dec2e4ba7861190083d1da83
xiaoxue|0cb576a05f23a9557278f41329c8dee1
7.对用户名和密码加密后,发送到服务端,服务端再对密码进行加密,然后与数据库比对
代码块
import hashlib
sor=b'wusir' #盐
pwd='123456'
user='alex'
#客户端对密码进行第一次加密
md5_obj=hashlib.md5(pwd.encode('utf-8'))
md5_obj.update(user.encode('utf-8'))
res=md5_obj.hexdigest()
#服务端接收客户端传过来的密码,再利用盐,进行二次加密,然后与数据库比对。
md5_obj1=hashlib.md5(sor)
md5_obj1.update(res.encode('utf-8'))
res1=md5_obj1.hexdigest()
print(res1) #d32aa373dec2e4ba7861190083d1da83
8. prite()相当调用sys.stdout.write()
代码块
import sys
name='hello'
sys.stdout.write(name) #print()就相当于调用底层sys.stdout.write(name)
9. 如何通过cmd导入python第三方包
pip install 包的名字
例如:pip install Django
别跑,点个赞再走