ChatRoom
一个 python + socket 实现的简单 Cli 版本聊天室
使用
你只需要安装 python3 环境即可运行脚本,项目下有两个包,一个叫做 client ,一个叫做 server。client 是客户端类的封装,server 是服务器类的封装。里面是核心代码。
这里的服务器监听 IP 默认设在本机作为演示,如果你想部署在服务器上需要自己手动更改 IP 。
使用的时候需要先运行服务器程序,运行之后可以看到服务器日志:
[Server] 服务器正在运行......
接着开启客户端程序,客户端将自动连接到服务器程序,使用如下指令登录:
login '用户名'
输入该指令之后便可以开始聊天了,使用如下指令发送讯息:
send '消息'
发送之后,服务器将会自动将你的消息转发到所有在线的客户端,客户端收到消息后会自动显示,这样就完成了聊天室的功能。
功能设计
server(服务器)
服务器不参与会话,只提供服务。
服务器需求:
监听客户端的链接
监听客户端的信息
将信息广播给所有人
client(客户端)
需求:
- 登录到服务器
- 发送信息给所有人
数据通信格式
登录
登录数据格式
{
"type": "login", # 请求类型
"nickname": "zhengxin" # 用户名
}
登录结果
{
"status": "ok", # 请求状态
"id": 1 # 服务器分配的用户id
}
信息交互
发送信息
{
'type': 'broadcast', # 用户发送信息类型
'sender_id': 1, # 发送信息的用户id
'message': 'message' # 用户发送的信息
}
服务器广播
{
'sender_id': 1, # 发送信息的人
'sender_nickname': 'zhengxin', # 用户名
'message': "hello wrold !" # 用户发送的信息
}
案例源码
py
import socket
import threading
import json
class Server:
"""服务器类"""
def __init__(self):
"""构造"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 链接列表
self.connections = list()
# 称呼列表
self.nicknames = list()
def start(self, address):
"""启动服务器"""
# 绑定端口
self.socket.bind(address)
# 启用监听
self.socket.listen(10)
print("[Server] 服务器正在运行......")
# 添加管理员账号
self.connections.append(None)
self.nicknames.append("System")
# 开始侦听
while True:
# 接收连接
connection, address = self.socket.accept()
print("[Server] 收到一个新连接", connection.getsockname(), connection.fileno())
# 开启新的线程,尝试接受数据
threading.Thread(target=self.handle_login, args=(connection,), daemon=True).start()
def handle_login(self, connection):
# 尝试接受数据
try:
buffer = connection.recv(1024).decode()
# 解析成 json 数据
obj = json.loads(buffer)
# 如果是连接指令,那么则返回一个新的用户编号,接收用户连接
if obj["type"] == "login":
self.connections.append(connection)
self.nicknames.append(obj["nickname"])
# 返回 json {'id':编号}
data = {"id": len(self.connections) - 1}
data = json.dumps(data).encode()
connection.send(data)
# 开辟一个新的线程
thread = threading.Thread(target=self.user_thread, args=(len(self.connections) - 1,))
thread.daemon = True
thread.start()
else:
print("[Server] 无法解析 json 数据包:", connection.getsockname(), connection.fileno(), )
except Exception as e:
print("[Server] 无法接受数据:", connection.getsockname(), connection.fileno(), e)
def user_thread(self, user_id):
"""
用户子线程
:param user_id: 用户 id
"""
# 获取用户链接
connection = self.connections[user_id]
# 获取用户名字
nickname = self.nicknames[user_id]
msg = f'用户 {nickname} ({user_id}) 加入聊天室'
print(f'[Server] {msg}')
# 广播一条信息
self.broadcast(message=msg)
# 侦听用户发来的信息
while True:
try:
buffer = connection.recv(1024).decode()
# 解析成 json 数据
obj = json.loads(buffer)
# 如果是广播指令
if obj["type"] == "broadcast":
self.broadcast(obj["sender_id"], obj["message"])
else:
print(f'[Server] 无法解析 json 数据包: {connection.getsockname()} {connection.filename()}')
except Exception as e:
print(f'[Server] 连接失效: {connection.getsockname()} {connection.fileno()}', e)
self.connections[user_id].close()
self.connections[user_id] = None
self.nicknames[user_id] = None
def broadcast(self, user_id=0, message=""):
"""
广播
:param user_id: 用户 id (0 为系统)
:param message: 广播内容
"""
for i in range(1, len(self.connections)):
if user_id != i:
data = {
"sender_id": user_id,
"sender_nickname": self.nicknames[user_id],
"message": message,
}
data = json.dumps(data).encode()
self.connections[i].send(data)
if __name__ == "__main__":
server = Server()
server.start(("0.0.0.0", 8000))
py
import socket
import threading
import json
"""
定义一个客户端类,
属性:socket、id、name
行为:
启动客户端
帮助信息
登录
发送信息
接收信息
"""
class Client:
"""客户端"""
intro = """
[Welcome] 简易聊天室客户端(Cli版)
[Help] login nickname - 登录到聊天室,nickname 是你选择的昵称
[Help] send message - 发送消息,message 是你输入的消息
"""
def __init__(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.id = None
self.nickname = None
def start(self, address):
"""启动客户端"""
self.socket.connect(address)
print(self.intro)
while True:
action = input("").strip()
if action.lower().startswith("login"):
self.do_login(action)
elif action.lower().startswith("send"):
self.do_send(action)
else:
print(self.intro)
def do_login(self, args):
"""登录聊天室"""
nickname = args.split()[1]
# 将昵称发送给服务器,获取用户 id
data = {"type": "login", "nickname": nickname}
data = json.dumps(data).encode()
self.socket.send(data)
# 尝试接受数据
try:
buffer = self.socket.recv(1024).decode()
obj = json.loads(buffer)
if obj["id"]:
self.nickname = nickname
self.id = obj["id"]
print("[Client] 成功登录到聊天室")
# 开启子线程用于接受数据
thread = threading.Thread(target=self.handle_receive)
thread.daemon = True
thread.start()
else:
print("[Client] 无法登录到聊天室")
except Exception as e:
print("[Client] 无法从服务器获取数据", e)
def do_send(self, args):
"""发送消息"""
message = args[5:]
# 显示自己发送的消息
print(f'[{self.nickname}({self.id})]')
# 开启子线程用于发送数据
thread = threading.Thread(target=self.send_message, args=(message,))
thread.daemon = True
thread.start()
def send_message(self, message):
"""发送消息线程"""
data = {"type": "broadcast", "sender_id": self.id, "message": message}
data = json.dumps(data).encode()
self.socket.send(data)
def handle_receive(self):
"""接受消息线程"""
while True:
try:
buffer = self.socket.recv(1024).decode()
obj = json.loads(buffer)
print(f'[{obj["sender_nickname"]}({obj["sender_id"]})] {obj["message"]}')
except Exception as e:
print("[Client] 无法从服务器获取数据", e)
if __name__ == "__main__":
client = Client()
client.start(("127.0.0.1", 8000))