一.tcp1-http
(一) utils.py
import socket
def get_server_socket(port=9000):
server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind( ("localhost",port) )
server.listen() # 设5 10 或不设置参数 开启线程或进程同时接受服务
return server
html_str_demo="""
<html>
<head>
<title>myhtml</title>
<meta charset="UTF-8"/>
</head>
<body>
<h1>菜鸟网址</h1>
<a href='https://www.runoob.com/'> <h1>网址</h1> </a>
</body>
</html>
"""
(二) HttpServer.py
import threading,multiprocessing
import socket
import time
from utils import html_str_demo
from utils import get_server_socket
class HTTPServer:
"""
客户端是浏览器 服务器不保留客户端连接状态 客户端出现很多请求如下:
1.主页面 GET /
2.GET /favicon.ico
3.若网页无内容显示还会重新加载请求 GET /
"""
def __init__(self,port=80):
"""
80端口是一个必争端口 属于系统核心的端口 确保本服务器能抢占
"""
self.server=get_server_socket()
print("【0】HttpServer服务器启动...")
print(self.server,"\n")
self.conn_count = 0 # 统计服务器连接在线客户端数量 ()
def start(self):
while True:
client_socket,client_addr=self.server.accept(port)
print("--- 新连接 ---\n")
print("ip:%s port:%s"%tuple(client_addr))
print("请求时间:",time.strftime("%Y-%m-%d %X"))
#将每客户端都设置一个独立线程或进程存在 分别进行请求的响应
handle_client_process=threading.Thread(
#handle_client_process=multiprocessing.Process(
target=self.handle_response,
args=(client_socket,)
)
handle_client_process.start()
def handle_response(self,client_socket:socket.socket):
print("---- 《step1 接受请求》 ----")
request_header = client_socket.recv(1024)
print(f"客户端: {client_socket.getpeername()}")
print("接收头信息长度:",len(request_header))
print(request_header.decode(),"\n")
print()
resp_start_line="HTTP/1.1 200 OK \r\n"
#\r\n结尾处 最好不出现空格 可能会出错误
resp_headers ="Server: myHTTP Server\r\n"
resp_headers+="Content-Type:text/html \r\n\r\n"
# 两次换行可再添更多头信息
resp_body=html_str_demo
response=resp_start_line + resp_headers + resp_body
client_socket.send( bytes(response,"UTF-8") )
print("---- 《step2 服务器发送响应信息》 ----")
print(response,"\n")
client_socket.close()
def main():
http_server=HTTPServer( 9000
)
http_server.start()
if __name__ == '__main__':
main()
(三)Client.py
import time,requests
from seleniumwire import webdriver
from playwright.sync_api import sync_playwright
def t1():
resp = requests.get("http://127.0.0.1:9000/")
resp.encoding = "utf8"
print("【1】", resp.status_code, resp.url)
print(resp.headers)
print("【2】",resp.text.rstrip(),"\n")
resp.close()
def t2():
"""
#1.谷歌浏览器 输入chrome://version/ 下载接近版本 谷歌启动器
#2 https://registry.npmmirror.com/binary.html?path=chromedriver/
#3 https://googlechromelabs.github.io/chrome-for-testing/
"""
print("--- t2 ----")
driver=webdriver.Chrome(executable_path="./chromedriver119.exe")
driver.get("http://localhost:9000/") #需要一些时间 获取完整信息
print("【1】",driver.requests)
print("【2】",driver.requests[0])
print("【3】\n",driver.requests[0].headers)
print("【4】",driver.requests[0].headers.get("User-Agent"))
print("【5】",driver.requests[0].response.__dict__)
print("【6】接受到服务器的头部信息\n")
print(driver.requests[0].response.headers)
for i , request in enumerate(driver.requests):
print(f"[{i+1}]",request)
print(driver.page_source)
time.sleep(1)
driver.quit()
def t3():
"""
https://www.jianshu.com/p/f68a8609a11b?v=1699534159329
实际上发送了两个请求的合并结果
1.浏览器会默认发送1个图标信息的请求 GET /favicon.ico HTTP/1.1
2.第二次请求 服务器并没有获限客户端的信息 但有重复发送的内容
"""
print("\n---- t3 ----")
with sync_playwright() as playwright:
browser = playwright.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
page.goto("http://localhost:9000/randon") #randon可改?
print(page.content()) #客户端两次请求的合并
time.sleep(2)
page.close()
context.close()
browser.close()
if __name__ == '__main__': #多个测试函数
#t1()
#t2()
t3()
#【思考】关于图标的问题favicon.ico,以后再完善
(四) 修改程序
#coding:utf8
import os
import time
from seleniumwire import webdriver
if not os.path.exists("html"):
os.mkdir("html")
driver = webdriver.Chrome(executable_path="./chromedriver119.exe")
driver.get("https://www.baidu.com")
time.sleep(2)
with open("html/baidu.html","wb")as f:
f.write(driver.page_source.encode())
driver.close()
"""
#server.py修改下面代码后 在浏览器运行 localhost:9000 百度网页呈现
#resp_body=html_str_demo
with open("html/baidu.html",mode="r",encoding="utf8")as f:
resp_body=f.read()
# port=9000访问正常 port=80访问不正常=出现源代码?
"""
二. tcp2-activeWeb
(一)init.py
#coding:utf8
"""
【动态网页】可以根据链接选择html文件 a3.py
subprocess.Popen > os.popen(cmd) > os.system(cmd)
"""
import os,sys
import subprocess
import time
print("【sys.path】导包路径")
for i,p in enumerate(sys.path):
print(f"[{i+1:<2}]",p)
if os.path.exists("mypages"):
print("慎重删除命令所有目录及其子目录和文件")
p=subprocess.Popen("cmd.exe",stdin=subprocess.PIPE,
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p.stdin.write("rd /s mypages\n".encode("utf8"))
p.stdin.write("Y\n".encode("utf8")) #确认删除
p.stdin.close()
print(p.stdout.read().decode("gbk"))
p.stdout.close()
p.stderr.close()
time.sleep(3)
os.mkdir("mypages")
else:
os.mkdir("mypages")
model="""
#coding:utf8
def service(param):
if param:
return f"<h3>传递参数值={param}</h3>"
else:
return "<h3>is not parameters 没有传送参数</h3>"
""".strip()
with open(r"mypages/echo.py",mode="w",encoding="utf8")as f:
f.write(model)
init="""
#coding:utf8
import sys
sys.path.append("mypages")
""".strip()
f2=r"mypages/__init__.py"
with open(f2,mode="w",encoding="utf8") as f:
f.write(init)
(二) utils.py
# coding:utf8
import os.path
import sys, socket
pages=os.path.abspath("mypages")
if pages not in list(sys.path):
print("【utils.py】已成功导包",end="\t")
sys.path.append( pages )
print("临时导包 仅在本程序生效?",pages in list(sys.path))
def get_tcp_server_socket(
hostname="localhost", # "0.0.0.0" "127.0.0.1"
port: int = 9999,
max_client_connection_num:int = 5 ) -> socket.socket:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind( (hostname,port) )
server.listen(max_client_connection_num)
return server
if __name__ == '__main__':
get_tcp_server_socket()
(三) server.py
# coding:utf8
import re, threading, socket
import time
from utils import get_tcp_server_socket
class HTTPServer_Model: ## 做一个可以被继承的类模板 装饰器
def __init__(self, port=80):
self.server = get_tcp_server_socket()
print("HttpServer服务器启动...")
print(self.server)
self.conn_count = 0
def start(self):
while True:
client_socket, client_addr = self.server.accept()
self.conn_count += 1
name = f"【客户端-{self.conn_count}】"
print(f"{name} %s %s" % tuple(client_addr))
time.sleep(0.5) # 促使 线程中的每个请求 按照顺序进行
handle_client_process = threading.Thread(
name=name,
target=self.handle_response,
args=(client_socket,)
)
handle_client_process.start()
def handle_response(self, client_socket: socket.socket):
print(f"[step0] 接收请求 {threading.current_thread().name}")
request_header_bytes = client_socket.recv(1024)
request_header_str = request_header_bytes.decode()
print(len(request_header_bytes))
print(request_header_str)
"""
【测试】 每做一步 测试一步 即重启服务器
浏览器打开 http://localhost:9999/ 已接收3个请求?
"""
url = self._step1_get_request_url(request_header_str)
method,p=self._step2_get_execute_method(request_url=url)
self._step3_send(client_socket,method,p)
client_socket.close()
def _step1_get_request_url(self, request_header_str):
request_url=""
try:
print("[step1] 解析 请求url")
line0 = request_header_str.split("\r\n")[0]
print("请求头首行: %s" % line0)
request_url = re.match(r"\w+ +(\S*)", line0).group(1)
print("请求头请求链接: %s" % request_url)
except Exception as e:
print("【报错】",e)
return request_url
def _step2_get_execute_method(self,request_url,param="username"):
print("[step2] 解析 请求url 对应的 后台程序模块(py文件)及其方法")
method,param_value=None,None
if request_url.startswith("/mypages"):
try:
f"""
【测试】
http://localhost:9999/mypages/echo/service?param={param}
mypages/python文件的名称:第二个/ 至 第三个/ 的内容
"""
i1 = request_url.index("/", 1) + 1 # 字符串第1个字符开始查找
i2 = request_url.index("/", i1)
model_name = request_url[i1: i2]
print("py文件名称:%s 位置(%d,%d)" % (model_name, i1, i2))
i3 = request_url.index("?")
method_name = request_url[i2 + 1: i3]
print("model名称:%s" % method_name)
param_value=request_url[i3+len("param=")+1:]
print("param_value:",param_value)
model=__import__(model_name, fromlist=["mypages"])
# from mypages import echo
method = getattr(model,method_name)
print("method方法:",method)
#print(method(param_value))
except:
print("解析后台程序报错...")
return method,param_value
def _step3_send(self, client_socket: socket.socket,method,p):
html0 = """
<head><meta charset="utf8"></head> <!-- 页面出现乱码解决 -->
<h2>Do interact with the server dynamically..</h2>
"""
response_body = html0
if method is not None:
html1 = method(p)
response_body = response_body + str(html1)
response_start_line = "HTTP/1.1 200 OK \r\n"
response_headers = "Server: Yootk Server\r\n"
response_headers+="Content-Type:text/html \r\n\r\n"
response=response_start_line + response_headers + response_body
client_socket.send( bytes(response,"utf-8") )
if __name__ == '__main__':
u="http://localhost:9999/mypages/echo/service?param=username"
print("【浏览器测试】%s"%u)
HTTPServer_Model().start()