www.zhblog.net

Python 不可不知的 Socket 通信

Socket API 是用来通过网络传递信息的,也为进程间通信提供一种形式。

最常见的 socket 程序就是 client-server 应用程序,下面在同一主机上进程之间进行通信。

首先,了解 socket 的 server 端和 client 端 api。


Socket API

Python 的 socket 模块提供了 socket api 的接口。

主要的方法:

socket()

bind()

listen()

accept()

connect()

connect_ex()

send()

recv()

close()

python 提供了直接映射到操作系统的 api(底层C),保证了方便与一致性。


TCP Socket

使用 socket.socket() 创建一个 socket 对象,并指定 socket 类型为 socket.SOCK_STREAM,这就默认我们使用的协议为 TCP,在通常情况下,这就是我们想要使用的。


TCP 协议优点:

可靠性:发送者会检测网络传输中是否丢包,并重发在网络中丢掉的数据包。

顺序发送:数据被读取时保证数据是发送者写的顺序。


另外,UDP socket 可以指定为 socket.SOCK_DGRAM,它与 TCP 相反,数据传输不可靠且无序。

下图中,可以明确看出 socket api 调用和 TCP 传输

sockets-tcp-flow.1da426797e37.jpg


图左边表示服务端,右边是客户端。

左边最上面四个方法:

socket():创建 socket

bind():绑定主机和端口

listen():监听客户端连接

accept():当客户端请求连接时,建立连接

客户端调用 connect() 与服务端建立连接并启动三次握手,握手可以确定网络双方是否可以彼此到达。简而言之,服务端可以到达客户端,客户端可以到达服务端。

中间的 send() 和 recv() 就是服务端与客户端进行数据交换。

最下面 close() 关闭各自的 socket。


Server-Client 程序

现在已经了解了 socket API 和 server 与 client 是如何通信的,接下来就做一个简单的实现:服务端返回从客户端接收的任何内容。

服务端 server.py:

import socket

import time





HOST = '127.0.0.1'

PORT = 8888



with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

    s.bind((HOST, PORT))

    s.listen()

    conn, addr = s.accept()



    with conn:

        print('Connected by', addr)

        while True:

            data = conn.recv(1024)

            print(data, 'time:', time.time())

            if not data:

                break

            conn.sendall(data)


socket.socket() 是以 context manager 的形式创建一个 socket 对象,这样不用显示调用 s.close()。传递的参数是地址族(address family)和 socket 类型。AF_INET 表示 IPv4 的网络地址族,SOCK_STREAM 表示是 TCP 协议的 socket。

bind() 用指定的网络接口和端口关联 socket。这个值是依赖 socket 的 address family。在上面例子中,使用 socket.AF_INET(IPv4),所以它接收元组参数(host, port)。

host 可以是域名,IP 地址,或者为空字符串。如果使用 IP 地址,host 应该是一个 IPv4 格式的地址字符串。IP 地址 127.0.0.1 是标准的 IPv4 回送地址回送地址(127.x.x.x)是本机回送地址(Loopback Address),即主机IP堆栈内部的IP地址,主要用于网络软件测试以及本地机进程间通信,无论什么程序,一旦使用回送地址发送数据,协议软件立即返回之,不进行任何网络传输。),所以仅本机程序允许连接服务。如果传递的是空字符串,服务将接收所有能获访问到的 IPv4 接口。如果为域名,则具有不确定性,它的结果依赖 DNS 的解析,可能每次运行程序得到的地址都不一样。

port 是一个 1-65535 范围的数字,表示接收客户端连接的 TCP 端口。通常使用端口大于 1023,1023 内的端口为系统使用预留。

accept() 阻塞并等待连接,当一个客户端连上,方法返回一个新的 socket 对象和客户端的地址信息。如果是 IPv4,则地址信息 (host, port);如果是 IPv6,则地址信息 (host, port, flowinfo, scopeid)。

需要注意的一点,返回的新 socket 对象与原来监听的 socket 是不一样的。新的 socket 对象将用于与客户端的通信。

conn.recv() 为阻塞调用,它读取客户端发送的任何数据,然后被 conn.sendall() 原样返回。

如果 conn.recv() 返回一个空的 bytes 对象:b'',则客户端关闭了连接,然后循环终止。


客户端 client.py

import socket

import time





HOST = '127.0.0.1'

PORT = 8888



with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

    s.connect((HOST, PORT))

    s.sendall(b'Hello, world')

    data = s.recv(1024)

    print(time.time())

print('Received', repr(data))


相较于服务端,客户端比较简单。创建一个 socket 对象,连接上服务端,发送数据,然后从服务端接收数据,并打印。


运行程序,先启动服务 server.py

conn, addr = s.accept()


服务会阻塞并等待,通过命令 netstat -an 可以看到,8888 端口为 LISTENING 状态

TCP    127.0.0.1:8888         0.0.0.0:0              LISTENING


运行客户端 client.py

结果:

server:



Connected by ('127.0.0.1', 9096)

b'Hello, world' time: 1582381399.6712174

b'' time: 1582381399.6722152



client:



1582381399.6722152

Received b'Hello, world'


理解通信

sockets-loopback-interface.jpg


当使用回送地址(IPv4:127.0.0.1;IPv6:::1),数据永远不会离开本机接触外网。如上图所示,回送地址是包含在主机内的,数据间的传输对于主机而已是本地的,所以 IP 地址 127.0.0.1 或 ::1 被称为 “localhost”。

应用程序使用回送地址可以在本机进程内相互通信。由于运行在本机与外网隔离,具有安全性。

如果程序 IP 地址不是使用 127.0.0.1 或 ::1,那么它将绑定到以太网,可以连接外部网络。

sockets-ethernet-interface.jpg


处理多个连接

现在的服务端存在限制,它只能服务一个客户端。

data = s.recv(1024)


上面 bufsize 参数设置为 1024,它表示一次最多接收的数据量,并不意味着 recv() 一定会返回 1024 个字节。

send() 也是类似,它返回发送的字节量,可能低于 bufsize。你需要多次检查以确保发送了所有数据。

我们应该避免去使用 sendall(),这个方法会持续发送数据直到所有数据发送完毕,或错误发生。如果成功返回 None。

到目前为止,明确两点:

1.如何同时处理多个连接

2.调用 send() 和 recv() 需要确保数据

并发有很多种方式,最受欢迎的是异步 I/O,asyncio 库从 Python3.4 引入。传统的选择可以用线程。在这里将使用更古老的系统调用:select()。

seelct() 允许检查多个 socket 完成状态,所以当调用 select() 你可以检查 socket 是否准备读或者写。但是,在 Python 中,鼓励使用标准库中 selectors 模块,它是构建在 select 之上的高效 I/O 多路复用。


多连接服务

我们将使用 selectors 模块创建一个服务,可以处理多个客户端的连接。


multi-server.py

import selectors

import socket

import types





host = '127.0.0.1'

port = 8888



sel = selectors.DefaultSelector()





def accept_wrapper(sock):

    conn, addr = sock.accept()

    print('accepted connection from', addr)

    conn.setblocking(False)

    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')

    events = selectors.EVENT_READ | selectors.EVENT_WRITE

    sel.register(conn, events, data=data)





def service_connection(key, mask):

    sock = key.fileobj

    data = key.data



    if mask & selectors.EVENT_READ:

        recv_data = sock.recv(1024)

        if recv_data:

            data.outb += recv_data

        else:

            print('closing connection to', data.addr)

            sel.unregister(sock)

            sock.close()



    if mask & selectors.EVENT_WRITE:

        if data.outb:

            print('echoing', repr(data.outb), 'to', data.addr)

            sent = sock.send(data.outb)

            data.outb = data.outb[sent:]





def main():

    lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    lsock.bind((host, port))

    lsock.listen()

    lsock.setblocking(False)

    sel.register(lsock, selectors.EVENT_READ, data=None)



    while True:

        events = sel.select(timeout=None)

        for key, mask in events:

            if key.data is None:

                accept_wrapper(key.fileobj)

            else:

                service_connection(key, mask)





if __name__ == '__main__':

    main()


首先,这个 server 与开始的单一 server 最大的不同是调用 lsock.setblocking(False) 设置 socket 非阻塞模式。

sel.register() 注册 socket 被监控的事件,对于 listen socket,我们监控事件:selectors.EVENT_READ。data 是用来存储任何你想附带在 socket 上的数据,它们是绑定在一起的。当调用 select(),data 也会随着 socket 返回,通常用来追踪 socket 发送接收的数据。

lsock.select(timeout=None) 会阻塞,直到有 socket 准备 I/O。它返回一个元组 (key, mask) 列表,每个 socket 有属于自己的 (key, mask)。key 是具名元组,包含 fileobj,是 socket 对象。key.data 是附带在 socket 上的数据。mask 是准备操作的事件。

如果 key.data 为 None,则表示监听的 socket 需要 accept() 连接。连接后返回一个新的 socket 对象,并为新的 socket 注册读和写事件。如果 key.data 不为 None,则表示 socket 已经连接,准备通信。

记住,这个版本的 server 中,socket 都是非阻塞的。如果不设置 blocking 为 False,会造成程序“挂起”。

接下来,socket 准备通信,通信分为读和写两部分。匹配读事件则从客户端读取 1024 字节,存储在 outb 中。若客户端关闭连接,则调用 unregister 不再监控此 socket。匹配写事件则将 outb 发送给客户端,send() 返回实际发送的字节,不保证全部发送,所有要确定未发送的字节以待后续继续发送。


multi-client.py

import socket

import selectors

import types





sel = selectors.DefaultSelector()



messages = [b'Message 1 from client.', b'Message 2 from client.']





def start_connection(host, port, num_conns):

    server_addr = (host, port)

    for i in range(num_conns):

        connid = i + 1

        print('starting connection', connid, 'to', server_addr)

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        sock.setblocking(False)

        sock.connect_ex(server_addr)

        event = selectors.EVENT_READ | selectors.EVENT_WRITE

        data = types.SimpleNamespace(connid=connid, msg_total=sum(len(m) for m in messages), recv_total=0, messages=list(messages), outb=b'')

        sel.register(sock, event, data=data)





def service_connection(key, mask):

    sock = key.fileobj

    data = key.data

    if mask & selectors.EVENT_READ:

        recv_data = sock.recv(1024)

        if recv_data:

            print('received', repr(recv_data), 'from connection', data.connid)

            data.recv_total += len(recv_data)

        if not recv_data or data.recv_total == data.msg_total:

            print('closing connection', data.connid)

            sel.unregister(sock)

            sock.close()

    if mask & selectors.EVENT_WRITE:

        if not data.outb and data.messages:

            data.outb = data.messages.pop(0)

        if data.outb:

            print('sending', repr(data.outb), 'to connection', data.connid)

            sent = sock.send(data.outb)

            data.outb = data.outb[sent:]





def main():

    host = '127.0.0.1'

    port = 8888

    num_conns = 3

    start_connection(host, port, num_conns)



    while True:

        events = sel.select(timeout=None)

        for key, mask in events:

            service_connection(key, mask)





if __name__ == '__main__':

    main()


客户端与服务端是非常类似的,它不需要 listen,直接连接。

num_conns 表示创建几个连接服务的客户端。

这里使用了 connect_ex() 来代替 connect(),因为在程序中遇到异常 connect() 会立即抛出 BlockingIOError,而 connect_ex() 返回一个错误指示。

另外,需要注意的是:客户端发送的数据与接收的数据一致时,关闭连接。真实程序不会这样关闭连接,但在这个例子中,服务端只返回客户端发送的内容。


异常

很显然,我们的程序没有处理异常,那只是为了实例简洁直观。现在已经熟悉了基础的 API,非阻塞模式 socket,和 select(),我们将添加异常处理。从 Python3.3 开始,socket 相关的错误抛出 OSError,我们需要捕捉 OSError。另一个相关的异常就是 timeout,timeout 在通信中可以算“正常的”异常,因为太多情况会造成传输中断,软件层、物理层都不是程序决定的。除了这些常见的异常外,程序还可能会遇到 BlockingIOError、OSError、ConnectionResetError、TimeoutError、ConnectionRefusedError。


结尾

socket 的基本通信,和非阻塞多客户端通信是通信中很基础,但很常见的问题。很多大的应用程序都是基于此,加上通信协议,所以很有必要理解 socket 通信。网络通信是个非常大的课题,如果有兴趣可以基于这些基础,或更多的时间去研究。


 

展开阅读全文

评论

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 心情