在Django 4.0中实现Web方式执行系统命令和结果显示 (2) 里使用 Websocket 实现运行长时间的命令,测试 netstat 命令时,发现Channels 调用 CmdConsumer receive 的通道被异步打开的 subprocess.Popen 阻塞,即使 Popen 结束,从客户端发起Websocket close, 也会导致服务端抛出超时异常。
本文在(2)的基础上(项目名称不变),添加了多线程、忙碌标记、页面上添加了 Close Websocket 按钮。对比(2),代码变化,主要在 home/templates/home.html 和 home/consumers.py 。
1. 开发环境
Windows 10 Home (20H2) or Ubuntu 18.04
Python 3.8.1
Pip 21.3.1
Django: 4.0
Windows下搭建开发环境,请参考Windows下搭建 Django 3.x 开发和运行环境
Ubuntu下搭建开发环境,请参考Ubuntu下搭建 Django 3.x 开发和运行环境
2. 创建 Django 项目
> django-admin startproject djangoWebsocketDemo
3. 添加 App
> cd djangoWebsocketDemo
> python manage.py startapp home
生成的项目目录结构,参考 如何在Django中使用template和Bootstrap
修改 djangoWebsocketDemo/settings.py
ALLOWED_HOSTS = ['localhost', '192.168.0.5']
...
INSTALLED_APPS = [
...
'home',
]
...
# Create static folder
STATICFILES_DIRS = [
BASE_DIR / 'static',
]
4. 静态资源和模板
1) 静态资源
从 https://jquery.com/ 下载 jQuery 包, 添加到 :
static/js/jquery-1.12.2.min.js
* static 等中间目录如果不存在,请新建它们,下同。
2) 添加 home/templates/home.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Home Page</title>
{% load static %}
<script language="javascript" src="{% static 'js/jquery-1.12.2.min.js' %}"></script>
</head>
<body>
<h3>Home Page</h3>
<form class="form-horizontal" role="form" action="" method="post" novalidate>
{% csrf_token %}
<p>
<label>System Environment:</label><br>
<select name="os_type" id="os_type" style="height: 32px; width: 50%">
<option value="windows">Server run on Windows</option>
<option value="ubuntu">Server run on Ubuntu</option>
</select>
</p>
<p>
<label>Cmd Type:</label><br>
<select name="cmd_type" id="cmd_type" style="height: 32px; width: 50%">
<option value="system_cmd_shell">System command or shell</option>
<option value="django_cmd_shell">Django command or shell</option>
</select>
</p>
<p>
<label>Command or shell:</label><br>
<textarea name="cmd_str" id="cmd_str" style="height: 64px; width: 50%"></textarea>
</p>
<p>
<label>Websocket:</label><br/>
<input type="text" name="ws_url" id="ws_url" value="ws://{{ request.get_host }}/ws/command/exec/" style="height: 32px; width:50%" />
</p>
<p>
<button type="button" id="btn_execute" class="btn btn-default btn-sm" onClick="javascript: commandExec();">
Execute
</button>
<button type="button" id="btn_close_ws" class="btn btn-default btn-sm" onClick="javascript: closeWebsocket();" style="display: none;">
Close Websocket
</button>
</p>
</form>
<p> </p>
<div id="cmd_result" style="padding: 15px; background-color: #e2e2e2; width: 50%; font-size: 12px; min-height: 120px;">
</div>
<p> </p>
<script type="text/javascript">
console.log("Home Page");
var globalSocket = null;
$(document).ready(function() {
changeType();
$("#os_type").change(function(e) {
changeType();
});
$("#cmd_type").change(function(e) {
changeType();
});
});
function changeType() {
var osType = $("#os_type").val();
var cmdType = $("#cmd_type").val();
var cmdStrNode = $("#cmd_str");
if (osType == "windows") {
if (cmdType == "system_cmd_shell") {
cmdStrNode.val("dir C:\\");
cmdStrNode.removeAttr("disabled");
} else if (cmdType == "django_cmd_shell") {
cmdStrNode.val("DirCommand C:\\");
cmdStrNode.attr("disabled", "disabled");
} else {
}
} else if (osType == "ubuntu") {
if (cmdType == "system_cmd_shell") {
cmdStrNode.val("dir /");
cmdStrNode.removeAttr("disabled");
} else if (cmdType == "django_cmd_shell") {
cmdStrNode.val("DirCommand /");
cmdStrNode.attr("disabled", "disabled");
} else {
}
} else {
}
}
function commandExec() {
var cmdType = $("#cmd_type").val();
var cmdStr = $("#cmd_str").val();
if (cmdStr == '') {
alert("Please enter command or shell");
$("#cmd_str").focus();
return;
}
var wsUrl = $("#ws_url").val();
if (wsUrl == '') {
alert("Please enter url");
$("#ws_url").focus();
return;
}
if (globalSocket == null) {
$("#cmd_result").html('');
$("#btn_execute").attr("disabled", "disabled");
createWebsocket(wsUrl);
}
}
function createWebsocket(url) {
if (globalSocket != null || url == '')
return;
//console.log("createWebsocket(): url = ", url);
globalSocket = new WebSocket(url);
globalSocket.onopen = funcWSOpen;
globalSocket.onclose = funcWSClose;
globalSocket.onerror = funcWSError;
globalSocket.onmessage = funcWSMessage;
}
function closeWebsocket() {
if (globalSocket != null) {
//console.log("closeWebsocket(): send close");
globalSocket.send(JSON.stringify({ "operation": "close"}));
$("#btn_close_ws").attr("disabled", "disabled");
}
}
function funcWSOpen(e) {
//console.log("funcWSOpen(): ", e);
$("#cmd_result").html("Executing ... <br><br>");
$("#btn_close_ws").removeAttr("disabled");
$("#btn_close_ws").css("display", "");
var data = {
"operation": "command",
"param": {
"cmd_type": $("#cmd_type").val(),
"cmd_str": $("#cmd_str").val(),
}
}
globalSocket.send(JSON.stringify(data));
}
function funcWSClose(e) {
//console.log("funcWSClose(): ", e);
$("#cmd_result").append("<br>Websocket: close<br>");
$("#btn_execute").removeAttr("disabled");
$("#btn_close_ws").css("display", "none");
globalSocket = null;
}
function funcWSError(e) {
//console.error("funcWSError(): ", e);
$("#cmd_result").append("<br>Websocket: error<br>");
$("#btn_execute").removeAttr("disabled");
$("#btn_close_ws").css("display", "none");
globalSocket = null;
}
function funcWSMessage(e) {
//console.log("funcWSMessage(): e.data = ", e.data);
var dataObj = JSON.parse(e.data);
if (dataObj['ret'] == "data") {
$("#cmd_result").append(dataObj['message'] + "<br>");
} else if (dataObj['ret'] == "finish") {
console.log("funcWSMessage(): ", dataObj['description'])
$("#cmd_result").append("<br>Websocket: " + dataObj['description'] + "<br>");
globalSocket.close(3009)
} else if (dataObj['ret'] == "error") {
console.log("funcWSMessage(): ", dataObj['description']);
$("#cmd_result").append("<br>Websocket: " + dataObj['description'] + "<br>");
} else {
$("#cmd_result").append("<br>Websocket: invalid data format<br>");
}
}
</script>
</body>
</html>
5. 视图和路由
1) 添加 home/utils.py
import subprocess
def openPipe(rsyncStr, shell=True, b_print=True):
return subprocess.Popen(rsyncStr, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def systemExecute(rsyncStr, shell=True, b_print=True):
#print(rsyncStr)
p = openPipe(rsyncStr, shell, b_print)
out, err = p.communicate()
return out.decode("gbk", "ignore"), err.decode("gbk", "ignore")
def systemExecuteLines(rsyncStr, shell=True, b_print=True):
#print(rsyncStr)
p = openPipe(rsyncStr, shell, b_print)
out, err = p.communicate()
return out.decode("gbk", "ignore").splitlines(), err.decode("gbk", "ignore").splitlines()
* 显示中文时需要 decode 设置为 gbk
2) 修改 home/views.py
from django.shortcuts import render
# Create your views here.
def home(request):
return render(request, "home.html")
3) 修改 djangoWebsocketDemo/urls.py
from django.contrib import admin
from django.urls import path
from home import views
urlpatterns = [
path('', views.home, name='home'),
path('admin/', admin.site.urls),
]
6. Django 自定义命令
1) 添加 home/management/commands/DirCommand.py
from django.core.management.base import BaseCommand
from home.utils import systemExecute
class Command(BaseCommand):
help = "Run 'dir' command on Windows or Linux."
def add_arguments(self, parser):
parser.add_argument("path")
def handle(self, *args, **options):
data,err = systemExecute("dir " + options["path"])
if err:
print(err)
else:
print(data)
2) 命令行方式运行 DirCommand
> python manage.py DirCommand c:\ # On Windows
Volume in drive C is WINDOWS
Volume Serial Number is D46B-07AC
Directory of c:\
2021/12/28 10:57 <DIR> Program Files
2021/12/28 13:50 <DIR> Program Files (x86)
2021/12/20 12:15 <DIR> Users
2021/12/29 11:26 <DIR> Virtual
2022/01/05 18:18 <DIR> Windows
0 File(s) 0 bytes
...
$ python manage.py DirCommand / # On Ubuntu
bin dev root usr etc lib mnt var
home lib64 opt sbin sys cdrom
...
7. Channels
https://pypi.org/project/channels/
1) 安装 Channels
$ pip install channels # pip 版本 21.3.1,低版本pip可能无法安装
2) 修改 djangoWebsocketDemo/settings.py
INSTALLED_APPS = [
...
'channels', # Add
]
...
ASGI_APPLICATION = 'djangoWebsocketDemo.asgi.application' # Add
CHANNEL_LAYERS = { # 频道后端,这里采用内存存储,默认是redis
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer"
}
}
3) 添加 home/consumers.py
import json, _thread
from channels.generic.websocket import WebsocketConsumer
from home.utils import openPipe
class CmdConsumer(WebsocketConsumer):
pipe_status = 0 # 0 - PIPE ready; 1 - PIPE busying; 2 - PIPE stopping;
close_code = 0
def threadFunc(self, cmdStr):
p = openPipe(cmdStr)
if p:
self.pipe_status = 1
for line in p.stdout:
#print("threadFunc() -> p.stdout: self.pipe_status = " + str(self.pipe_status))
if self.pipe_status == 2:
break
self.send(text_data=json.dumps({
"ret": "data",
"message": line.decode("gbk", "ignore").rstrip("\r\n")
}))
if self.pipe_status != 2:
err = p.stderr.read()
if err:
self.send(text_data=json.dumps({
"ret": "error",
"description": err.decode("gbk", "ignore").rstrip("\r\n")
}))
#print("threadFunc() -> p.stdout: self.pipe_status = " + str(self.pipe_status) + ", close_code = " + str(self.close_code))
if self.pipe_status == 2 and self.close_code == 0:
self.send(text_data=json.dumps({
"ret": "finish",
"description": "Finish after shutdown PIPE"
}))
self.pipe_status = 0
else:
self.send(text_data=json.dumps({
"ret": "error",
"description": "Unable to open PIPE"
}))
def connect(self):
self.accept()
def disconnect(self, close_code):
print("disconnect(): close_code = " + str(close_code) + ", pipe_status = " + str(self.pipe_status))
self.close_code = close_code
if self.pipe_status == 1:
self.pipe_status = 2
def receive(self, text_data):
print("receive(): text_data = ", text_data)
message = dict(json.loads(text_data))
if message["operation"] == "command":
#print("receive() -> command: pipe_status = " + str(self.pipe_status))
if self.pipe_status == 0:
execStr = ""
if message['param']['cmd_type'] == "system_cmd_shell":
execStr = message['param']['cmd_str']
elif message['param']['cmd_type'] == "django_cmd_shell":
execStr = "python manage.py " + message['param']['cmd_str']
if execStr != "":
try:
_thread.start_new_thread(self.threadFunc, (execStr, ))
except:
self.send(text_data=json.dumps({
"ret": "error",
"description": "Unable to start thread"
}))
else:
self.send(text_data=json.dumps({
"ret": "error",
"description": "Invalid command type"
}))
else:
self.send(text_data=json.dumps({
"ret": "error",
"description": "PIPE is busying"
}))
elif message["operation"] == "close":
#print("receive() -> close: pipe_status = " + str(self.pipe_status))
if self.pipe_status == 0:
self.send(text_data=json.dumps({
"ret": "finish",
"description": "Finish directly"
}))
elif self.pipe_status == 1:
self.pipe_status = 2
else:
self.send(text_data=json.dumps({
"ret": "error",
"description": "Invalid data format"
}))
4) 添加 home/routing.py
from django.urls import path
from home import consumers
websocket_urlpatterns = [
path("ws/command/exec/", consumers.CmdConsumer.as_asgi()),
]
5) 修改 djangoWebsocketDemo/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from home import routing
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoWebsocketDemo.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
),
})
8. 运行
> python manage.py runserver
访问 http://localhost:8000/
> python manage.py runserver 192.168.0.5:8080
访问 http://192.168.0.5:8080/