|
服务器端
- import socket
- import threading
- qq_port = 5566
- user_list = {}
- index_content = '''
- HTTP/1.1 200 ok
- Content-Type: text/html
- '''
- def qq_server():#建立socket套接字
- qq_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- qq_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
- qq_server.bind(("", qq_port))
- qq_server.listen(128)
- return qq_server
- def new_server(new_qq_ip):#建立客户端被动链接
- # new_qq_server.send("链接以建立,可以开始交谈".encode("utf-8"))
- print("用户%s访问"%str(new_qq_ip))
- while True:
- qq_txt = new_qq_server.recv(102400).decode("utf-8")
- try:
- if "GET" in qq_txt:
- with open("./test.txt","r") as f:
- txt = f.read()
- content = index_content + txt
- new_qq_server.sendall(content.encode("utf-8"))
- except:
- print(qq_txt)
- with open("./test.txt","a") as f:
- f.write(qq_txt+"\r<br>")
- new_qq_server.close()
- if __name__ == "__main__":
- qq_server = qq_server()
- while True:
- new_qq_server, new_qq_ip = qq_server.accept()
- new = threading.Thread(target=new_server,args=(new_qq_ip,))
- new.start()
- esc_server.close()
复制代码
测试用客户端
- import socket
- if __name__ == "__main__":
- tcp_qq = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- tcp_qq.connect(("192.168.95.1", 5566)) # 目标ip与端口
- while True:
- qq_text = input("输入发送:").encode("utf-8")
- tcp_qq.send(qq_text)
- tcp_qq.close()
复制代码 |
|