设计实现一个I/O密集型的聊天室应用 1.单线程/多线程? 单进程/多进程? 2.阻塞io/非阻塞io/多路复用?

1.多线程阻塞I/O

  1. 在循环中等待连接
  2. 对于每个新的连接,开启单独的线程 在线程中,收发消息

阻塞模式中,操作阻塞直到完成,或者操作系统返回错误(比如连接超时) 当socket等待io(比如recv或者send),会阻塞io可用(或者发生错误) 所以相当1k个并发连接,我们需要创建1k个线程(单线程阻塞模式下,无法支持多路accept)

效率低的原因 1.大多数的线程等待io时阻塞,这时cpu会进行上下文切换,占用cpu时间 线程越多浪费在切换上的时间越多 2.当我们想要获取线程1000的消息,不得不检查线程1-999 3.线程多会造成内存浪费

BlockingEchoServer.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import socket
import threading

def echo(conn):
   while True:
       # blocks the thread until data is received
       data = conn.recv(1024)
       if not data: break
       conn.sendall(data)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.bind(('localhost', 1234))
   s.listen(1000)
   while True:
       conn, addr = s.accept()
       threading.Thread(target=echo, args=(conn,)).start()

BlockingEchoClient.py

1
2
3
4
5
6
7
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 1234))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))

2.单线非阻塞I/O

非阻塞, 如果操作不能立即完成则返回失败

1.创建socket列表 2.遍历socket列表 对于每个非阻塞的socket,尝试read和write 这些操作会立即返回(即使io没有ready)

1.创建了非阻塞server socket,用于接收连接

控制流是基于BlockingIOError异常的 当连接关闭时 recv会返回b"“空字符,当没有数据时,recv会raise BlockingIOError 需要保存已关闭的连接在下次循环的时候进行清除

BusyWaitNonBlockingEchoServer.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import socket

sockets = set()
remove_pending = set()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
   server.bind(('localhost', 2002))
   server.listen(1000)
   server.setblocking(False)      # set Non-Blocking socket
   while True:
       try:
           conn, addr = server.accept()
           conn.setblocking(False)
           sockets.add(conn)
       except BlockingIOError: # [Errno 35] Resource temporarily unavailable - indicates that "accept" returned without results
           pass

       remove_pending.clear()
       for conn in sockets:
           try:
               data = conn.recv(1024)
               if not data: # connection closed
                   remove_pending.add(conn)
               else:
                   print(data)
                   conn.sendall(data)
           except BlockingIOError:  # recv/send return without data.
               pass

       # remove closed connections
       for conn in remove_pending:
           sockets.remove(conn)

BusyWaitNonBlockingEchoClient.py

1
2
3
4
5
6
7
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect(('localhost', 2002))
   while True:
       s.sendall(input().encode('utf-8'))
       print(s.recv(1024))

1.我们需要循环遍历所有的socket,无论它的状态 对于每个socket,至少要调用一次操作系统命令 如果我们有10k个socket,为了查找一个消息不得不遍历所有的socket 这意味着只能让新进入的消息等待,然后花费宝贵的时间去检查socket的状态 2.大多数的时间,我们轮训每个socket检查它的状态 这一直在浪费cpu,99%的时间都在轮训而不是在执行业务逻辑

3.单线程同步事件多路I/O

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
SYNOPSIS

#include <sys/select.h>
int
     select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);

DESCRIPTION
select() and pselect() allow a program to monitor multiple file
       descriptors, waiting until one or more of the file descriptors become
       "ready" for some class of I/O operation (e.g., input possible).  A
       file descriptor is considered ready if it is possible to perform a
       corresponding I/O operation (e.g., read(2), or a sufficiently small
       write(2)) without blocking.  

select() examines the I/O descriptor sets whose addresses are passed in readfds, writefds, and errorfds to see if some of their descriptors are ready for reading, are ready
     for writing, or have an exceptional condition pending, respectively. On return, select()
     replaces the given descriptor sets with subsets consisting of those descriptors that are ready for the requested operation.  select() returns the total number of ready
     descriptors in all the sets.

select是系统调用,监控file descriptors的状态,当他们可read/可write/error时候返回

readfds 一组监控的文件描述符,当他们中的任一可读后,select会将其返回 writefds 一组监控的文件描述符,当他们中的任一可写后,select会将其返回 errorfds 一组监控的文件描述符,当他们中的任一异常时,select会将其返回

select调用会阻塞直到相关的事件发生(可读/可写/异常)

select本身有限制,最多监控数为1024,事件复杂度是O(n) 1.select所有平台 2.poll(大多数POSIX平台) 3.epoll(linux)使用红黑树存储监控的文件描述符 4.kqueue (FreeBSD, macOS) 5.IOCP - Input/output completion port (Windows)

4.Python Echo Server and Client Implementation, using Selectors

py包只能够有两个模块支持异步io select和selectors selectors做了更好的封装 1.根据系统,选择更好的模型 epoll|kqueue|devpoll > poll >select 2.定义了BaseSelector基类,可以注册/取消注册一组读/写文件描述符 省去自己存储read/write文件描述符

selector.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

Python Echo Server

1
2
3
4
5
6
sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

首先创建一个selector,然后创建一个server socket绑定1234端口 开启监听 设置为非阻塞模式 [sock, selectors.EVENT_READ, accept] # 监听socket,单socket可读的时,调用accpet

1
2
3
4
5
while True:
   events = sel.select()
   for key, mask in events:
       callback = key.data
       callback(key.fileobj, mask)

while中等待select阻塞调用, select调用当监控的文件可读/可写的时候返回,然后调用事件关联的回调

1.当新client连接时,select会判断server socket可读了,然后调用accept accept会接口新的连接,并创建一个新的conn socket,并且设置为非阻塞模式。 然后注册新的conn socket在selector中,当socket可读时,调用read方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def accept(sock, mask):
   conn, addr = sock.accept()  # Should be ready
   print('accepted', conn, 'from', addr)
   conn.setblocking(False)
   sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
   data = conn.recv(1000)  # Should be ready
   if data:
       print('echoing', repr(data), 'to', conn)
       conn.send(data)  # Hope it won't block
   else:
       print('closing', conn)
       sel.unregister(conn)
       conn.close()

Python Echo Client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Echo client program
import socket

HOST = 'localhost'        # The remote host
PORT = 1234               # The same port as used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
   s.connect((HOST, PORT))
   while True:
       msg = input("Enter msg: ").encode('utf8')
       s.sendall(msg)

       data = s.recv(1024)
       print('Received: ', data.decode('utf-8'))
The client connects to the server,  sends a message and wait for a re

client连接server,发送消息并等待server响应 recv阻塞直到收到server的响应

输出 EchoClient.py

1
2
3
4
5
6
Enter msg: Hello
Received:  Hello
Enter msg: How r u?
Received:  How r u?
Enter msg: Great, Thanks for asking.
Received:  Great, Thanks for asking.

EchoServer.py

1
2
3
4
5
accepted <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)> from ('127.0.0.1', 65167)
echoing b'Hello' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>
echoing b'How r u?' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>
echoing b'Great, Thanks for asking.' to <socket.socket fd=7, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 1234), raddr=('127.0.0.1', 65167)>

5.Python Chat Server and Client Implementation using Selectors

Python Chat Server

1
2
3
4
5
6
7
class ChatServer:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._connections_msg_queue = {}
       self._host = kwargs['host']
       self._port = kwargs['port']
Accepting a New Connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
   def run(self):
       # create and register server socket for reading (accepting new connections)
       server_sock = socket.socket()
       server_sock.bind((self._host, self._port))
       server_sock.listen(SERVER_NUM_CONNECTIONS)
       server_sock.setblocking(False)
       self._selector.register(server_sock, selectors.EVENT_READ, self._accept)

       while True:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)
Adding a New Connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def _accept(self, sock, mask):
       conn, addr = sock.accept()
       self._add_connection(conn)

   def _add_connection(self, conn):
       # register new client connection for reading (accepting new messages)
       print(f'{conn.getpeername()} hello!')
       self._connections_msg_queue[conn] = collections.deque()
       conn.setblocking(False)
       self._selector.register(conn, selectors.EVENT_READ, self._read)
Recieve a new message from a connection
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 def _read(self, conn, mask):
       self._read_message(conn)

   def _read_message(self, conn):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           self._add_message(conn, data)
       else:
           self._remove_connection(conn)

   def _add_message(self, sender_conn, raw_msg):
       try:
           msg = json.loads(raw_msg)
           message = Message(msg['user'], msg['text'])
           print(f"{sender_conn.getpeername()}: [{msg['user']}] {msg['text']}")
       except (json.JSONDecodeError, KeyError,) as e:
           print(f"We got unknown type of message: {raw_msg}; error: {e}")
           return

       # register every client connection for writing (broadcast recent messages)
       for conn, messages in self._connections_msg_queue.items():
           conn.setblocking(False)  # not sure if needed
           self._selector.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write)
           messages.append(message)

将最新的消息都要发送到每个client,所以这里遍历所有的连接client,然后modify selector的事件 当sock可写的时候发送消息。

1
2
3
4
5
 def _read_write(self, conn, mask):
       if mask & selectors.EVENT_READ:
           self._read(conn, mask)
       if mask & selectors.EVENT_WRITE:
           self._write(conn, mask)
Broadcasting recent messages to all connections
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def _write(self, conn, mask):
       self._write_pending_messages(conn)

   def _write_pending_messages(self, conn):
       messages = self._connections_msg_queue[conn]
       while messages:
           msg = messages.popleft()
           try:
               conn.send(f'[{msg.user}] {msg.text}'.encode('utf-8'))
           except Exception as e:
               print('Error occurred', e)
               self._remove_connection(conn)
               return

       # if no more message to send, don't listen to available for write
       conn.setblocking(False)  # not sure if needed
       self._selector.modify(conn, selectors.EVENT_READ, self._read)

当queue中没有多余的消息后,modify socket的状态,只监控socket的可读状态

完整代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import argparse
import collections
import json
import selectors
import socket


SERVER_NUM_CONNECTIONS = 1000
BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatServer:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._connections_msg_queue = {}
       self._host = kwargs['host']
       self._port = kwargs['port']

   ##### SELECT FUNCTIONS ########################

   def _accept(self, sock, mask):
       conn, addr = sock.accept()
       self._add_connection(conn)

   def _read_write(self, conn, mask):
       if mask & selectors.EVENT_READ:
           self._read(conn, mask)
       if mask & selectors.EVENT_WRITE:
           self._write(conn, mask)

   def _read(self, conn, mask):
       self._read_message(conn)

   def _write(self, conn, mask):
       self._write_pending_messages(conn)

   ##### CHAT FUNCTIONS ########################

   def _add_connection(self, conn):
       # register new client connection for reading (accepting new messages)
       print(f'{conn.getpeername()} hello!')
       self._connections_msg_queue[conn] = collections.deque()
       conn.setblocking(False)
       self._selector.register(conn, selectors.EVENT_READ, self._read)

   def _remove_connection(self, conn):
       print(f'{conn.getpeername()} bye bye!')
       self._selector.unregister(conn)
       conn.close()
       del self._connections_msg_queue[conn]

   def _read_message(self, conn):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           self._add_message(conn, data)
       else:
           self._remove_connection(conn)

   def _add_message(self, sender_conn, raw_msg):
       try:
           msg = json.loads(raw_msg)
           message = Message(msg['user'], msg['text'])
           print(f"{sender_conn.getpeername()}: [{msg['user']}] {msg['text']}")
       except (json.JSONDecodeError, KeyError,) as e:
           print(f"We got unknown type of message: {raw_msg}; error: {e}")
           return

       # register every client connection for writing (broadcast recent messages)
       for conn, messages in self._connections_msg_queue.items():
           conn.setblocking(False)  # not sure if needed
           self._selector.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self._read_write)
           messages.append(message)

   def _write_pending_messages(self, conn):
       messages = self._connections_msg_queue[conn]
       while messages:
           msg = messages.popleft()
           try:
               conn.send(f'[{msg.user}] {msg.text}'.encode('utf-8'))
           except Exception as e:
               print('Error occurred', e)
               self._remove_connection(conn)
               return

       # if no more message to send, don't listen to available for write
       conn.setblocking(False)  # not sure if needed
       self._selector.modify(conn, selectors.EVENT_READ, self._read)

   def run(self):
       # create and register server socket for reading (accepting new connections)
       server_sock = socket.socket()
       server_sock.bind((self._host, self._port))
       server_sock.listen(SERVER_NUM_CONNECTIONS)
       server_sock.setblocking(False)
       self._selector.register(server_sock, selectors.EVENT_READ, self._accept)

       while True:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat server arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
args = parser.parse_args()

chat = ChatServer(**vars(args))
chat.run()

Python Chat Client

selector监控stdin

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import argparse
import collections
import json
import selectors
import socket
import sys


BUFFER_SIZE = 1000

Message = collections.namedtuple('Message', ['user', 'text'])


class ChatClient:

   def __init__(self, **kwargs):
       self._selector = selectors.DefaultSelector()
       self._sock = None
       self._host = kwargs['host']
       self._port = kwargs['port']
       self._name = kwargs['username'] or input('Enter username:')
       self._running = True

   def _read_stdin(self, input, mask):
       data = sys.stdin.readline().strip()
       if data:
           msg = json.dumps({'user': self._name, 'text': data}, ensure_ascii=False).encode('utf8')
           self._sock.send(msg) # We should wait for selector here, but it will work.

   def _read_msg(self, conn, mask):
       data = conn.recv(BUFFER_SIZE)  # Should be ready
       if data:
           print(data.decode("utf-8"))
       else:
           print('Connection to server has failed')
           self._running = False

   def run(self):
       self._selector.register(sys.stdin, selectors.EVENT_READ, self._read_stdin)

       self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
       self._sock.connect((self._host, self._port))
       self._sock.setblocking(False)
       self._selector.register(self._sock, selectors.EVENT_READ, self._read_msg)

       while self._running:
           events = self._selector.select()
           for key, mask in events:
               callback = key.data
               callback(key.fileobj, mask)


parser = argparse.ArgumentParser(description='Chat client arguments.')
parser.add_argument('-host', nargs='?', default='localhost')
parser.add_argument('-port', nargs='?', default=1234)
parser.add_argument('-username', nargs='?')
args = parser.parse_args()

chat = ChatClient(**vars(args))
chat.run()

6.Reactor Design Pattern

7.其他应用

libuv

Node.js的非阻塞i/o引擎

nginx

worker process工作进程(单线程)实现了reactor模式的event loop去处理io任务

Twisted

python中事件驱动的网络引擎 可以被实现为多种server(http, mail, pub/sub)

来自 https://hila.sh/2019/12/28/reactor.html

相关链接 https://infloop.tw/2015/02/19/reactor-pattern-in-python/

https://infloop.tw/2015/02/14/nonblocking-io-and-io-multiplexing/