多人/终端 文件共同编辑器的实现-python版

一. 前言

本文探讨多人在线文件编辑器的实现,主要借助PyQt5来进行图形化界面实现。借助消息中间件activemq来进行消息的订阅和转发,使用MQTT作为网络协议。
activemq原理:服务器端创建一个唯一订阅号,发送者可以向这个订阅号中发东西,然后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器其实是一个消息中转站。

二. 开发环境

Windows10
activemq5.16.1
python3.8+PyQt5+cryptography+paho-mqtt

三. 具体过程

1.Windows10下环境配置

  1. 到官网https://activemq.apache.org/components/classic/download/下载apache-activemq-5.16.1并解压

    官网下载

  2. 命令行输入:

 - cd [activemq_install_dir]
 - bin\activemq start

启动成功:


启动成功后的显示
  1. 使用pip安装Python扩展PyQt5+cryptography+paho-mqtt
    pip/conda install即可

2.核心思路及代码

1. 初始化界面的设定

初始化界面展示:

class Main(QMainWindow):

    HOST = "127.0.0.1"
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)

        self.patch_stack = []       #输入堆栈
        self.author = False         #设定主机
        self.known_authors = []     #用户列表
        self.patch_set = []   # 存储doc对象

        #显示文本框要求输入
        resp, ok = QInputDialog.getText(
            self, "导引", "粘贴你收到的ID来加入或者不输入来创建自己的房间"
        )

        #如果没有输入则生成一个,并设置author为True即为主机,site=0
        if not resp:
            self.portal_id, self.pad_name, self.fernet_key = self.generate_portal_tuple()
            self.author = True
            print(f"分享这个id给其他人:\n  {self.portal_id}\n\n")
        else:
            self.pad_name, self.fernet_key = self.parse_portal_id(resp)

        self.fernet = Fernet(self.fernet_key)

        self.site = 0
        #如果不是主机则随机生成用户标识号site
        if not self.author:
            self.site = int(random.getrandbits(32))

        self.known_authors.append(self.site)

generate_portal_tuple()产生房间id具体实现方法,使用ip地址作为头,uuid生成pad,Fernet对称加密算法生成密钥key,然后把它们组合起来:

    #生成分享ID
    def generate_portal_tuple(self, include_server=False):
        pad = uuid.uuid4().hex
        key = Fernet.generate_key().decode()

        temp_str = ""
        if include_server:
            temp_str += f"{self.HOST}:"

        temp_str += f"{pad}:{key}"

        return base64.b64encode(temp_str.encode()).decode(), pad, key.encode()

解析方法调用decode即可:

    #解析分享ID
    def parse_portal_id(self, portal_id):
        tup = base64.b64decode(portal_id.encode()).decode().split(":")

        if len(tup) == 2:
            pad, key = tup
            return pad, key.encode()
        elif len(tup) == 3:
            server, pad, key = tup
            return server, pad, key.encode()

2. 连接及topic设置

先设置一波topic用来标识各种行为,使用字典存储。之后各种输入,或者用户加入退出等行为都将通过这些topic来标识转发

        self.known_authors.append(self.site)
        self.mqtt_name = f"jwy/pad/{self.pad_name}"
        self.subs = {
            self.mqtt_name + "/aloha": self.on_topic_aloha,
            self.mqtt_name + "/patch": self.on_topic_patch,
            self.mqtt_name + "/authors/enter": self.on_topic_authors,
            self.mqtt_name + "/authors/set": self.on_topic_authors,
            self.mqtt_name + "/authors/leave": self.on_topic_authors
        }

再设置连接参数及连接事件处理

        #初始化客户端,设置连接和接收消息事件
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(self.HOST, 1883, 60)

on_connect方法实现:

    #重写连接上之后的方法,订阅各种topic,发布进入通知
    def on_connect(self, client, userdata, flags, rc):
        for topic in self.subs.keys():
            self.client.subscribe(topic, qos=2)

        self.client.publish(self.mqtt_name + "/authors/enter", str(self.site))

        #如果不是主机,发送进入消息,更新用户列表
        if not self.author:
            self.client.publish(self.mqtt_name + "/aloha", str(self.site))

on_message方法,接收到消息之后查询字典调用对应topic的方法:

    #接收到消息之后查询字典调用对应topic的方法
    def on_message(self, client, userdata, msg):
        self.subs[msg.topic](msg.topic, msg.payload)

aloha topic上的事件,更新主机的用户列表

    #设置aloha topic上的事件
    def on_topic_aloha(self, topic, payload):
        if self.author:

            set_dict = {
                "dst": int(payload),
                "authors": self.known_authors
            }

            print("Main author procedure: Sending known authors...")
            self.client.publish(self.mqtt_name + "/authors/set", json.dumps(set_dict))

            print("Main author procedure: Sending known patches...")

            for patch in self.patch_set:
                payload = self.fernet.encrypt(patch.encode())
                self.client.publish(self.mqtt_name + "/patch", payload, qos=2)

            print("Main author procedure: Done")

3. 文本编辑界面初始化

定义Editor类来进行文本的编辑,继承QTextEdit

#文本编辑器界面
class Editor(QTextEdit):
    upd_text = pyqtSignal(str)  # in
    change_evt = pyqtSignal(str)  # out
    res_state_evt = pyqtSignal(str)  # out

    def __init__(self, site):
        self.view = QPlainTextEdit.__init__(self)
        self.setFrameStyle(QFrame.NoFrame)

        self.font = QFont()
        self.font.setStyleHint(QFont.Monospace)
        self.font.setFixedPitch(True)
        self.font.setPointSize(16)
        self.setFont(self.font)

        self.doc = Doc()
        self.doc.site = site

        self.upd_text.connect(self.on_upd_text)

    #鼠标点击事件
    def keyPressEvent(self, e):
        cursor = self.textCursor()

        #粘贴处理
        if e.matches(QKeySequence.Paste) and QApplication.clipboard().text():
            pos = cursor.position()
            for i, c in enumerate(QApplication.clipboard().text()):
                patch = self.doc.insert(pos + i, c)
                self.change_evt.emit(patch)
        #删除处理
        elif e.key() == Qt.Key_Backspace:
            if not self.toPlainText():
                return

            sel_start = cursor.selectionStart()
            sel_end = cursor.selectionEnd()
            if sel_start == sel_end:
                patch = self.doc.delete(cursor.position() - 1)
                self.change_evt.emit(patch)
            else:
                for pos in range(sel_end, sel_start, -1):
                    patch = self.doc.delete(pos - 1)
                    self.change_evt.emit(patch)
        #插入并且ctrl键没被按下
        elif e.key() != Qt.Key_Backspace and e.text() and e.modifiers() != Qt.ControlModifier:
            sel_start = cursor.selectionStart()
            sel_end = cursor.selectionEnd()
            if sel_start != sel_end:
                for pos in range(sel_end, sel_start, -1):
                    patch = self.doc.delete(pos - 1)
                    self.change_evt.emit(patch)

            patch = self.doc.insert(sel_start, e.text())
            self.change_evt.emit(patch)

        self.res_state_evt.emit(json.dumps(self.doc.patch_set))

        QTextEdit.keyPressEvent(self, e)

设置文本更新事件

    @pyqtSlot(str)
    def on_upd_text(self, patch):
        self.doc.apply_patch(patch)

        cursor = self.textCursor()
        old_pos = cursor.position()
        self.setPlainText(self.doc.text)
        cursor.setPosition(old_pos)
        self.setTextCursor(cursor)

apply_patch根据输入值实现插入或删除字符

    def apply_patch(self, patch: str) -> None:
        json_char = json.loads(patch)
        op = json_char["op"]

        if op == self.PATCH_INSERT_TOKEN:
            char = Char(json_char["char"], Position(
                json_char["pos"], json_char["sites"]), json_char["clock"])
            self._doc.add(char)
        elif op == self.PATCH_DELETE_TOKEN:
            char = next(c for c in self._doc if
                        c.pos.pos == json_char["pos"] and
                        c.pos.sites == json_char["sites"] and
                        c.clock == json_char["clock"]
                        )
            self._doc.remove(char)

4. json广播实现同步

主要用json字符串来传输文本编辑器的改变情况,如位置,增/删等。

    #采用json字符串来接收插入消息patch topic上的监听事件
    def on_topic_patch(self, topic, payload):
        if payload not in self.patch_stack:
            payload_decrypted = self.fernet.decrypt(payload)
            patch = json.loads(payload_decrypted)
            if patch["src"] != self.site:
                print(f"Received patch: {payload_decrypted.decode()}")
                self.patch_stack.append(payload)
                self.editor.upd_text.emit(payload_decrypted.decode())

5. 字符存储方法

定义Doc类来存储Char类的列表,里面包括具体字符char,用户site和clock,同时设置存储策略_alloc

class Doc:

    PATCH_INSERT_TOKEN = "i"
    PATCH_DELETE_TOKEN = "d"

    def __init__(self, site: int=0) -> None:
        self._site: int = site

        self._strategy = RandomStrategy()
        self._alloc = Allocator(self._strategy, self.site)

        self._clock: int = 0
        #_doc里面存储排序字符列表,每一个Char里面都包括具体字符,用户site和clock
        self._doc: SortedList["Char"] = SortedList()
        self._doc.add(Char("", Position([0], [-1]), self._clock))
        self._doc.add(Char("", Position([2 ** BASE_BITS - 1], [-1]), self._clock))

插入方法处理:

    def insert(self, pos: int, char: str) -> str:
        self._clock += 1
        p, q = self._doc[pos].pos, self._doc[pos + 1].pos

        new_char = Char(char, self._alloc(p, q), self._clock)
        self._doc.add(new_char)

        return self._serialize(self.PATCH_INSERT_TOKEN, new_char)

存储的文本值的获取,一个一个读取char

    @property
    def text(self) -> str:
        return "".join([c.char for c in self._doc])

position位置类:

#使用树路径标识字符的位置
class Position:

    def __init__(self, pos: List[int]=None, sites: List[int]=None, base_bits: int=0) -> None:
        # Each element is part of the tree path as a var. base digit of size 2^(BASE_BITS + el. no)
        self.pos = pos or []
        self.sites = sites or []
        self.base_bits = base_bits or BASE_BITS

    #类对象实例化之前调用函数
    @classmethod
    def from_int(cls, pos: int, depth: int, sites: List[int], base_bits: int=0) -> "Position":
        new_pos = cls(sites=sites, base_bits=base_bits)

        for _depth in range(depth, 0, -1):
            shift = base_bits + _depth - 1
            # Extract n rightmost bits where n equals the no. of bits at depth `_depth`
            new_pos.pos.insert(0, pos & (1 << shift) - 1)
            pos >>= shift

        return new_pos

    def to_int(self, trim: int=0) -> int:
        #如果平衡了
        if trim:
            workspace = self._ptrim(self.pos, trim)
        else:
            workspace = self.pos

        out = 0
        for _depth, i in enumerate(workspace):
            # Add zeros and place `i` in them
            out = (out << (self.base_bits + _depth)) | int(i)

        return out

    def interval_between(self, other: "Position", depth: int) -> Tuple[int, bool]:
        if self.pos == other.pos and self.sites[-1] != other.sites[-1] and depth > len(self.pos):
            return self.interval_at(depth), True

        return other.to_int(depth) - self.to_int(depth) - 1, False

    def interval_at(self, depth: int) -> int:
        return 2 ** (self.base_bits + depth - 1) - 1

    @staticmethod
    def _ptrim(pos: List[int], depth: int) -> List[int]:
        out = []
        length = len(pos)
        for _depth in range(depth):
            if _depth < length:
                out.append(pos[_depth])
            else:
                out.append(0)

        return out

    def __lt__(self, other):
        # Lexical order by `position` and `site` as tie-breaker
        return list(zip(self.pos, self.sites)) < list(zip(other.pos, other.sites))

    def __str__(self):
        return str(list(zip(self.pos, self.sites)))

设定偏移量为5,即每5个字符为一个单位,到下一行时pos+5

6. 颜色区分实现

可以预先写几个颜色值到COLORS里面,然后对于不同的author选取不同color,并根据输入人来改变文字背景,每应用一个则block_pos+1

#颜色显示,继承QSyntaxHighlighter类
class AuthorHighlighter(QSyntaxHighlighter):

    COLORS = (
        (251, 222, 187),
        (187, 251, 222),
        (222, 251, 187),
        (222, 187, 251),
        (187, 222, 251)
    )

    NUM_COLORS = len(COLORS)

    def __init__(self, parent):
        QSyntaxHighlighter.__init__(self, parent)
        self.parent = parent

    def highlightBlock(self, text):
        curr_line = self.previousBlockState() + 1

        doc_line = 0
        block_pos = 0

        text_format = QTextCharFormat()
        #循环,c为字符,a为用户id(site)
        for c, a in zip(self.parent.doc.text, self.parent.doc.authors[1:-1]):
            if c in ("\n", "\r"):
                doc_line += 1
                continue
            else:
                if doc_line == curr_line:
                    text_format.setBackground(QBrush(self.get_author_color(a), Qt.SolidPattern))

                    self.setFormat(block_pos, 1, text_format)

                    block_pos += 1
                elif doc_line > curr_line:
                    break

        self.setCurrentBlockState(self.previousBlockState() + 1)

7.最终效果

Demo

四. 总结与讨论

  • 由于采用mq进行订阅和转发,所以必须要有至少一个broker在后台运行进行转发
  • 没有采用加锁的设计,在互斥性方面还有提升的空间
  • 界面方面可以添加一些按钮之类提升用户友好型
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容