Skip to content

ChatRoom

一个 python + socket 实现的简单 Cli 版本聊天室

使用

你只需要安装 python3 环境即可运行脚本,项目下有两个包,一个叫做 client ,一个叫做 server。client 是客户端类的封装,server 是服务器类的封装。里面是核心代码。

这里的服务器监听 IP 默认设在本机作为演示,如果你想部署在服务器上需要自己手动更改 IP 。

使用的时候需要先运行服务器程序,运行之后可以看到服务器日志:

[Server] 服务器正在运行......

接着开启客户端程序,客户端将自动连接到服务器程序,使用如下指令登录:

login '用户名'

输入该指令之后便可以开始聊天了,使用如下指令发送讯息:

send '消息'

发送之后,服务器将会自动将你的消息转发到所有在线的客户端,客户端收到消息后会自动显示,这样就完成了聊天室的功能。

功能设计

server(服务器)

服务器不参与会话,只提供服务。

服务器需求:

  1. 监听客户端的链接

  2. 监听客户端的信息

  3. 将信息广播给所有人

client(客户端)

需求:

  1. 登录到服务器
  2. 发送信息给所有人

数据通信格式

登录

登录数据格式

{
    "type": "login",  # 请求类型
    "nickname": "zhengxin"  # 用户名
}

登录结果

{
    "status": "ok",  # 请求状态
    "id": 1  # 服务器分配的用户id
}

信息交互

发送信息

{
    'type': 'broadcast',  # 用户发送信息类型
    'sender_id': 1,  # 发送信息的用户id
    'message': 'message'  # 用户发送的信息
}

服务器广播

{
    'sender_id': 1,  # 发送信息的人
    'sender_nickname': 'zhengxin',  # 用户名
    'message': "hello wrold !"  # 用户发送的信息
}

案例源码

py
import socket
import threading
import json


class Server:
    """服务器类"""

    def __init__(self):
        """构造"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 链接列表
        self.connections = list()
        # 称呼列表
        self.nicknames = list()

    def start(self, address):
        """启动服务器"""
        # 绑定端口
        self.socket.bind(address)
        # 启用监听
        self.socket.listen(10)
        print("[Server] 服务器正在运行......")

        # 添加管理员账号
        self.connections.append(None)
        self.nicknames.append("System")

        # 开始侦听
        while True:
            # 接收连接
            connection, address = self.socket.accept()
            print("[Server] 收到一个新连接", connection.getsockname(), connection.fileno())
            # 开启新的线程,尝试接受数据
            threading.Thread(target=self.handle_login, args=(connection,), daemon=True).start()

    def handle_login(self, connection):
        # 尝试接受数据
        try:
            buffer = connection.recv(1024).decode()
            # 解析成 json 数据
            obj = json.loads(buffer)
            # 如果是连接指令,那么则返回一个新的用户编号,接收用户连接
            if obj["type"] == "login":
                self.connections.append(connection)
                self.nicknames.append(obj["nickname"])

                # 返回 json {'id':编号}
                data = {"id": len(self.connections) - 1}
                data = json.dumps(data).encode()
                connection.send(data)

                # 开辟一个新的线程
                thread = threading.Thread(target=self.user_thread, args=(len(self.connections) - 1,))
                thread.daemon = True
                thread.start()
            else:
                print("[Server] 无法解析 json 数据包:", connection.getsockname(), connection.fileno(), )
        except Exception as e:
            print("[Server] 无法接受数据:", connection.getsockname(), connection.fileno(), e)

    def user_thread(self, user_id):
        """
        用户子线程
        :param user_id: 用户 id
        """
        # 获取用户链接
        connection = self.connections[user_id]
        # 获取用户名字
        nickname = self.nicknames[user_id]

        msg = f'用户 {nickname} ({user_id}) 加入聊天室'
        print(f'[Server] {msg}')

        # 广播一条信息
        self.broadcast(message=msg)

        # 侦听用户发来的信息
        while True:
            try:
                buffer = connection.recv(1024).decode()
                # 解析成 json 数据
                obj = json.loads(buffer)
                # 如果是广播指令
                if obj["type"] == "broadcast":
                    self.broadcast(obj["sender_id"], obj["message"])
                else:
                    print(f'[Server] 无法解析 json 数据包: {connection.getsockname()} {connection.filename()}')

            except Exception as e:
                print(f'[Server] 连接失效: {connection.getsockname()} {connection.fileno()}', e)
                self.connections[user_id].close()
                self.connections[user_id] = None
                self.nicknames[user_id] = None

    def broadcast(self, user_id=0, message=""):
        """
        广播
        :param user_id: 用户 id (0 为系统)
        :param message: 广播内容
        """
        for i in range(1, len(self.connections)):
            if user_id != i:
                data = {
                    "sender_id": user_id,
                    "sender_nickname": self.nicknames[user_id],
                    "message": message,
                }
                data = json.dumps(data).encode()
                self.connections[i].send(data)


if __name__ == "__main__":
    server = Server()
    server.start(("0.0.0.0", 8000))
py
import socket
import threading
import json

"""
定义一个客户端类,
    属性:socket、id、name

    行为:
        启动客户端
        帮助信息
        登录
        发送信息
        接收信息
"""


class Client:
    """客户端"""

    intro = """
[Welcome] 简易聊天室客户端(Cli版)
[Help] login nickname - 登录到聊天室,nickname 是你选择的昵称
[Help] send message - 发送消息,message 是你输入的消息
"""

    def __init__(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.id = None
        self.nickname = None

    def start(self, address):
        """启动客户端"""
        self.socket.connect(address)
        print(self.intro)
        while True:
            action = input("").strip()
            if action.lower().startswith("login"):
                self.do_login(action)
            elif action.lower().startswith("send"):
                self.do_send(action)
            else:
                print(self.intro)

    def do_login(self, args):
        """登录聊天室"""
        nickname = args.split()[1]

        # 将昵称发送给服务器,获取用户 id
        data = {"type": "login", "nickname": nickname}
        data = json.dumps(data).encode()
        self.socket.send(data)
        # 尝试接受数据
        try:
            buffer = self.socket.recv(1024).decode()
            obj = json.loads(buffer)
            if obj["id"]:
                self.nickname = nickname
                self.id = obj["id"]
                print("[Client] 成功登录到聊天室")

                # 开启子线程用于接受数据
                thread = threading.Thread(target=self.handle_receive)
                thread.daemon = True
                thread.start()
            else:
                print("[Client] 无法登录到聊天室")
        except Exception as e:
            print("[Client] 无法从服务器获取数据", e)

    def do_send(self, args):
        """发送消息"""
        message = args[5:]
        # 显示自己发送的消息
        print(f'[{self.nickname}({self.id})]')

        # 开启子线程用于发送数据
        thread = threading.Thread(target=self.send_message, args=(message,))
        thread.daemon = True
        thread.start()

    def send_message(self, message):
        """发送消息线程"""
        data = {"type": "broadcast", "sender_id": self.id, "message": message}
        data = json.dumps(data).encode()
        self.socket.send(data)

    def handle_receive(self):
        """接受消息线程"""
        while True:
            try:
                buffer = self.socket.recv(1024).decode()
                obj = json.loads(buffer)
                print(f'[{obj["sender_nickname"]}({obj["sender_id"]})] {obj["message"]}')
            except Exception as e:
                print("[Client] 无法从服务器获取数据", e)


if __name__ == "__main__":
    client = Client()
    client.start(("127.0.0.1", 8000))