澳门新浦京手机版Python高手之路【十三】socket网络编程,pythonsocket

本文由码农网 –
王国峰原创翻译,转载请看清文末的转载要求,欢迎参与我们的付费投稿计划!

1、客户端/服务器架构

  什么是客户端/服务器架构?对于不同的人来说,它意味着不同的东西,这取决于你问谁以及描述的是软件还是硬件系统。在这两种情况中的任何一种下,前提都很简单:服务器就是一系列硬件或软件,为一个或多个客户端(服务的用户)提供所需的“服务”。它存在唯一目的就是等待客户端的请求,并响应它们(提供服务),然后等待更多请求。另一方面,客户端因特定的请求而联系服务器,并发送必要的数据,然后等待服务器的回应,最后完成请求或给出故障的原因。服务器无限地运行下去,并不断地处理请求;而客户端会对服务进行一次性请求,然后接收该服务,最后结束它们之间的事务。客户端在一段时间后可能会再次发出其他请求,但这些都被当作不同的事务。

澳门新浦京手机版 1

Python高手之路【十三】socket网络编程,pythonsocket

想要构建聊天应用,或者甚至是游戏吗?那么,socket服务器将成为你迈出的第一步。一旦你了解了创建服务器的基本功能,那么后续的优化步骤就会变得同样简单。

2、客户端/服务器编程

什么是客户/服务器架构?

什么是客户/服务器架构?不同的人有不同的答案。这要看你问的是什么人,以及指的是软件系统还是硬件系统了。但是,有一点是共通的:服务器是一个软件或硬件,用于提供客户需要的“服务”。服务器存在的唯一目的就是等待客户的请求,给这些客户服务,然后再等待其它的请求。另一方面,客户连上一个(预先已知的)服务器,提出自己的请求,
发送必要的数据,然后就等待服务器的完成请求或说明失败原因的反馈。服务器不停地处理外来的请求,而客户一次只能提出一个服务的请求,等待结果。然后结束这个事务。客户之后也可以再提出其它的请求,只是,这个请求会被视为另一个不同的事务了。

澳门新浦京手机版 2

上图就是Internet 上典型的客户/服务器概念。 展示了如今最常见的客户/服务器结构。一个用户或客户电脑通过
Internet 从服务器上取数据。 这的确是一个客户/服务器架构的系统,
但还有更多类似的系统满足客户/服务器架构。而且,客户/服务器架构也可以应用到电脑硬件上。

socket服务器的工作方式是这样的,不间断地运行以等待客户端的连接。一旦客户端连接上了,服务器就会将它添加到客户名单中,然后开始等待来自客户端的消息。

2.1套接字

  套接字的起源可以追溯到20 世纪70
年代,它是加利福尼亚大学的伯克利版本UNIX(称为BSD
UNIX)的一部分。因此,有时你可能会听过将套接字称为伯克利套接字或BSD
套接字。套接字最初是为同一主机上的应用程序所创建,使得主机上运行的一个程序(又名一个进程)与另一个运行的程序进行通信。这就是所谓的进程间通信(Inter
Process
Communication,IPC)。有两种类型的套接字:基于文件的和面向网络的。UNIX
套接字是我们所讲的套接字的第一个家族,并且拥有一个“家族名字”AF_UNIX(又名AF_LOCAL,在POSIX1.g
标准中指定),它代表地址家族(address family):UNIX。包括Python
在内的大多数受欢迎的平台都使用术语地址家族及其缩写AF;其他比较旧的系统可能会将地址家族表示成域(domain)或协议家族(protocol
family),并使用其缩写PF 而非AF。类似地,AF_LOCAL(在2000~2001
年标准化)将代替AF_UNIX。然而,考虑到后向兼容性,很多系统都同时使用二者,只是对同一个常数使用不同的别名。Python
本身仍然在使用AF_UNIX。因为两个进程运行在同一台计算机上,所以这些套接字都是基于文件的,这意味着文件
系统支持它们的底层基础结构。这是能够说得通的,因为文件系统是一个运行在同一主机上的多个进程之间的共享常量

  第二种类型的套接字是基于网络的,它也有自己的家族名字AF_INET,或者地址家族:因特网。另一个地址家族AF_INET6
用于第6
版因特网协议(IPv6)寻址。此外,还有其他的地址家族,这些要么是专业的、过时的、很少使用的,要么是仍未实现的。在所有的地址家族之中,目前AF_INET
是使用得最广泛的。

  Python 2.5 中引入了对特殊类型的Linux
套接字的支持。套接字的AF_NETLINK 家族允许使用标准的BSD
套接字接口进行用户级别和内核级别代码之间的IPC。针对Linux
的另一种特性(Python 2.6
中新增)就是支持透明的进程间通信(TIPC)协议。TIPC
允许计算机集群之中的机器相互通信,而无须使用基于IP 的寻址方式。Python
对TIPC 的支持以AF_TIPC 家族的方式呈现。

客户/服务器网络编程

在完成服务之前,服务器必需要先完成一些设置动作。先要创建一个通讯端点,让服务器能“监听”请求。你可以把我们的服务器比做一个公司的接待员或回答公司总线电话的话务员,一旦电话和设备安装完成,话务员也到了之后,服务就可以开始了。

在网络世界里,基本上也是这样——一旦通讯端点创建好之后,我们在“监听”的服务器就可以进入它那等待和处理客户请求的无限循环中了。当然,
我们也不能忘记在信纸上,杂志里,广告中印上公司的电话号码。否则,就没有人会打电话进来了!同样地,服务器在准备好之后,也要通知潜在的客户,让它们知道服务器已经准备好处理服务了。否则,没有人会提请求的。比方说,你建立了一个全新的网站。这个网站非常的出色,非常的吸引人,非常的有用,是所有网站中最酷的一个。但如果你不把网站的网址或者说统一资源定位符(URL)广而告之的话,没有人会知道这个网站的存在的。这个网站也就永远不见天日了。对于公司
总部的新电话也是这样,你不把电话公之于众,那就没有人会打电话进来。

现在,你对服务器如何工作已经有了一个很好的认识。你已经完成了最难的那一部分。客户端的编程相对服务器端来说就简单得多了。
所有的客户只要创建一个通讯端点,
建立到服务器的连接。然后客户就可以提出请求,请求中,也可以包含必要的数据交互。一旦请求处理完成,客户收到了结果,通讯就结束了。

澳门新浦京手机版 3

2.2套接字地址:主机-端口对

  有效的端口号范围为0~65535(尽管小于1024 的端口号预留给了系统)。

什么是套接字? 

套接字是一种具有之前所说的“通讯端点”概念的计算机网络数据结构。网络化的应用程序在开始任何通讯之前都必需要创建套接字。就像电话的插口一样,没有它就完全没办法通讯。套接字起源于
20 世纪 70 年代加利福尼亚大学伯克利分校版本的 Unix,即人们所说的 BSD
Unix。因此,有时人们也把套接字称为“伯克利套接字”或“BSD
套接字”。一开始,套接字被设计用在同一台主机上多个应用程序之间的通讯。这也被称进程间通讯,或
IPC。套接字有两种,分别是基于文件型的和基于网络型的。

Unix 套接字是我们要介绍的第一个套接字家族。其“家族名”为 AF_UNIX(在
POSIX1.g 标准中也叫 AF_LOCAL),表示“地址家族:UNIX”。包括 Python
在内的大多数流行平台上都使用术语“地址家族”及其缩写“AF”。而老一点的系统中,地址家族被称为“域”或“协议家族”,并使用缩写“PF”而不是“AF”。同样的,AF_LOCAL(在
2000-2001 年被列为标准)将会代替
AF_UNIX。不过,为了向后兼容,很多系统上,两者是等价的。Python
自己则仍然使用
AF_UNIX。由于两个进程都运行在同一台机器上,而且这些套接字是基于文件的。所以,它们的底层结构是由文件系统来支持的。这样做相当有道理,因为,同一台电脑上,文件系统的确是不同的进程都能访问的。

另一种套接字是基于网络的,它有自己的家族名字: AF_INET,或叫“地址家族:
Internet”。 还有一种地址家族 AF_INET6
被用于网际协议第 6 版(IPv6)寻址上。还有一些其它的地址家族,不过,它们要么是只用在某个平台上,要么就是已经被废弃,或是很少被使用,或是根本就还没有实现。所有地址家族中,AF_INET
是使用最广泛的一个。Python 2.5 中加入了一种 Linux
套接字的支持:AF_NETLINK(无连接[见下])套接字家族让用户代码与内核代码之间的
IPC 可以使用标准 BSD
套接字接口。而且,相对之前那些往操作系统中加入新的系统调用,proc
文件系统支持或是“IOCTL”等笨重的方案来说,这种方法显得更为优美,更为安全。

Python 只支持 AF_UNIX,AF_NETLINK,和 AF_INET
家族。由于我们只关心网络编程,所以在本章的大部分时候,我们都只用
AF_INET

不要走开,下面是完整的源代码:

2.3面向连接的套接字

  TCP 套接字,必须使用SOCK_STREAM 作为套接字类型。

套接字地址:主机与端口

如果把套接字比做电话的插口——即通讯的最底层结构,那主机与端口就像区号与电话号码的一对组合。有了能打电话的硬件还不够,你还要知道你要打给谁,往哪打。一个
Internet
地址由网络通讯所必需的主机与端口组成。而且不用说,另一端一定要有人在听才可以。否则,你就会听到熟悉的声音“对不起,您拨的是空号,请查对后再播”。你在上网的时候,可能也见过类似的情况,如“不能连接该服务器。服务器无响应或不可达”。

合法的端口号范围为 0 到 65535。其中,小于 1024
的端口号为系统保留端口。如果你所使用的是 Unix
操作系统,保留的端口号(及其对应的服务/协议和套接字类型)可以通过/etc/services文件获得。常用端口号列表可以从下面这个网站获得:

// Set time limit to indefinite execution
set_time_limit (0);

// Set the ip and port we will listen on
$address = 'localhost';
$port = 10000;
$max_clients = 10;

// Array that will hold client information
$client = Array();

// Create a TCP Stream socket
$sock = socket_create(AF_INET, SOCK_STREAM, 0);
// Bind the socket to an address/port
socket_bind($sock, $address, $port) or die('Could not bind to address');
// Start listening for connections
socket_listen($sock);

echo "Waiting for connections...rn";

// Loop continuously
while (true) {
    // Setup clients listen socket for reading
    $read[0] = $sock;
    for ($i = 0; $i < $max_clients; $i++) {
        if (isset($client[$i]['sock']))
            $read[$i + 1] = $client[$i]['sock'];
    }
    // Set up a blocking call to socket_select()
    if (socket_select($read, $write = NULL, $except = NULL, $tv_sec = 5) < 1)
        continue;
    /* if a new connection is being made add it to the client array */
    if (in_array($sock, $read)) {
        for ($i = 0; $i < $max_clients; $i++) {
            if (empty($client[$i]['sock'])) {
                $client[$i]['sock'] = socket_accept($sock);
                echo "New client connected $irn";
                break;
            }
            elseif ($i == $max_clients - 1)
                echo "Too many clients...rn";
        }
    } // end if in_array

    // If a client is trying to write - handle it now
    for ($i = 0; $i < $max_clients; $i++) { // for each client
        if (isset($client[$i]['sock'])) {
            if (in_array($client[$i]['sock'], $read)) {
                $input = socket_read($client[$i]['sock'], 1024);
                if ($input == null) {
                    echo "Client disconnecting $irn";
                    // Zero length string meaning disconnected
                    unset($client[$i]);
                } else {
                    echo "New input received $irn";
                    // send it to the other clients
                    for ($j = 0; $j < $max_clients; $j++) {
                        if (isset($client[$j]['sock']) && $j != $i) {
                            echo "Writing '$input' to client $jrn";
                            socket_write($client[$j]['sock'], $input, strlen($input));
                        }
                    }
                    if ($input == 'exit') {
                        // requested disconnect
                        socket_close($client[$i]['sock']);
                    }
                }
            } else {
                echo "Client disconnected $irn";
                // Close the socket
                socket_close($client[$i]['sock']);
                unset($client[$i]);
            }
        }
    }
} // end while
// Close the master sockets
socket_close($sock);

2.4无连接的套接字

  实现这种连接类型的主要协议是用户数据报协议(更为人熟知的是其缩写UDP)。为了创建UDP
套接字,必须使用SOCK_DGRAM 作为套接字类型。

socket()模块函数

 我们先用一个实例来说明socket函数的基本使用,在本实例中会创建一个 TCP
服务器程序,这个程序会把客户发送过来的字符串加上一个时间戳(格式:'[时间]数据’)返回给客户。

TCP 时间戳服务器 (t_server.py):

 

import socket
import time

host = '127.0.0.1'
port = 21567
bufsiz = 1024
add = (host,port)

tcpSerSock = socket.socket()
tcpSerSock.bind(add)
tcpSerSock.listen(5)

while True:
    print('waiting for connection ... ')
    tcpCliSock,address = tcpSerSock.accept()
    print('...connected from : ',address)
    while True:
        recv_data = tcpCliSock.recv(bufsiz)
        recv_data = str(recv_data,encoding='utf-8')
        if not recv_data:
            break
        tcpCliSock.sendall(bytes('[%s] %s' % (time.ctime(),recv_data),encoding='utf-8'))

 

sk.bind(address)

  s.bind(address)
将套接字绑定到地址。address地址的格式取决于地址族。在AF_INET下,以元组(host,port)的形式表示地址。

sk.listen(backlog)

  开始监听传入连接。backlog指定在拒绝连接之前,可以挂起的最大连接数量。

     
backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5
      这个值不能无限大,因为要在内核中维护连接队列

sk.setblocking(bool)

  是否阻塞(默认True),如果设置False,那么accept和recv时一旦无数据,则报错。

sk.accept()

  接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址。

  接收TCP 客户的连接(阻塞式)等待连接的到来

sk.connect(address)

  连接到address处的套接字。一般,address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。

sk.connect_ex(address)

  同上,只不过会有返回值,连接成功时返回 0
,连接失败时候返回编码,例如:10061

sk.close()

  关闭套接字

sk.recv(bufsize[,flag])

  接受套接字的数据。数据以字符串形式返回,bufsize指定最多可以接收的数量。flag提供有关消息的其他信息,通常可以忽略。

sk.recvfrom(bufsize[.flag])

  与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址。

sk.send(string[,flag])

  将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小。即:可能未将指定内容全部发送。

sk.sendall(string[,flag])

  将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常。

      内部通过递归调用send,将所有内容发送出去。

sk.sendto(string[,flag],address)

  将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议。

sk.settimeout(timeout)

  设置套接字操作的超时期,timeout是一个浮点数,单位是秒。值为None表示没有超时期。一般,超时期应该在刚创建套接字时设置,因为它们可能用于连接的操作(如
client 连接最多等待5s )

sk.getpeername()

  返回连接套接字的远程地址。返回值通常是元组(ipaddr,port)。

sk.getsockname()

  返回套接字自己的地址。通常是一个元组(ipaddr,port)

sk.fileno()

  套接字的文件描述符

 

TCP 时间戳客户端(t_client.py) :

import socket

host = '127.0.0.1'
port = 21567
bufsiz = 1024
addr = (host,port)

tcpCliSock = socket.socket()
tcpCliSock.connect(addr)

while True:
    data = input('> ')
    if not data:
        break
    tcpCliSock.sendall(bytes(data,encoding='utf-8'))
    data = tcpCliSock.recv(bufsiz)
    recv_data = str(data,encoding='utf-8')
    if not recv_data:
        break
    print(recv_data)

tcpCliSock.close()

运行我们的客户端与服务器程序 :

先运行服务端t_server.py文件

澳门新浦京手机版 4

下面就是客户端的输入与输出,不输入数据,直接按回车键就可以退出程序:

澳门新浦京手机版 5

上面的实例中,无论发送还是接收,都是以字符串的形式,下面一个实例是发送图片文件及接收图片文件:

澳门新浦京手机版 6

 1 import socket
 2 
 3 sk = socket.socket()
 4 sk.bind(('127.0.0.1',9999,))
 5 sk.listen(5)
 6 
 7 while True:
 8     conn,address = sk.accept()
 9     conn.sendall(bytes('welcome to conair',encoding='utf-8'))
10 
11     #先接收文件大小,然后再开始接收
12     file_size = str(conn.recv(1024),encoding='utf-8')
13 
14     #为解决粘包,设置的一个标志
15     conn.sendall(bytes('ko',encoding = 'utf-8'))
16 
17     total_size = int(file_size)
18     has_recv = 0
19 
20     f = open('new.jpg','wb')
21 
22     while True:
23         if total_size == has_recv:
24             break
25         data = conn.recv(1024)
26         f.write(data)
27         has_recv += len(data)
28     f.close()

服务器端文件
澳门新浦京手机版 7

 1 import socket
 2 import os
 3 
 4 obj = socket.socket()
 5 obj.connect(('127.0.0.1',9999))
 6 
 7 ret_bytes = obj.recv(1024)
 8 ret_str = str(ret_bytes,encoding='utf-8')
 9 print(ret_str)
10 
11 #获取图片文件大小,然后发送
12 size = os.stat('06.jpg').st_size
13 obj.sendall(bytes(str(size),encoding='utf-8'))
14 
15 obj.recv(1024)
16 
17 with open('06.jpg','rb') as f :
18     for line in f:
19         obj.sendall(line)
20 
21 obj.close()

客户端文件

啊呀,乍一看这似乎是一个大工程,但是我们可以先将它分解为几个较小的部分。第一部分是创建服务器。Lines:2至20。

3、python中的网络编程

socketserver模块实现并发操作

SocketServer内部使用 IO多路复用 以及 “多线程” 和 “多进程”
,从而实现并发处理多个客户端请求的Socket服务端。即:每个客户端请求连接到服务器时,Socket服务端都会在服务器是创建一个“线程”或者“进程”
专门负责处理当前客户端的所有请求。

澳门新浦京手机版 8

ThreadingTCPServer

ThreadingTCPServer实现的Soket服务器内部会为每个client创建一个
线程”,该线程用来和客户端进行交互。

1、ThreadingTCPServer基础

使用ThreadingTCPServer:

  • 创建一个继承自 SocketServer.BaseRequestHandler 的类
  • 类中必须定义一个名称为 handle 的方法
  • 启动ThreadingTCPServer

澳门新浦京手机版 9

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import SocketServer
 4 
 5 class MyServer(SocketServer.BaseRequestHandler):
 6 
 7     def handle(self):
 8         # print self.request,self.client_address,self.server
 9         conn = self.request
10         conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
11         Flag = True
12         while Flag:
13             data = conn.recv(1024)
14             if data == 'exit':
15                 Flag = False
16             elif data == '0':
17                 conn.sendall('通过可能会被录音.balabala一大推')
18             else:
19                 conn.sendall('请重新输入.')
20 
21 
22 if __name__ == '__main__':
23     server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
24     server.serve_forever()
25 
26 SocketServer实现服务器

socketserver服务端
澳门新浦京手机版 10

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 
 7 ip_port = ('127.0.0.1',8009)
 8 sk = socket.socket()
 9 sk.connect(ip_port)
10 sk.settimeout(5)
11 
12 while True:
13     data = sk.recv(1024)
14     print 'receive:',data
15     inp = raw_input('please input:')
16     sk.sendall(inp)
17     if inp == 'exit':
18         break
19 
20 sk.close()
21 
22 客户端

socketserver客户端

2、ThreadingTCPServer源码剖析

ThreadingTCPServer的类图关系如下:

澳门新浦京手机版 11

内部调用流程为:

  • 启动服务端程序
  • 执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和
    端口
  • 执行 BaseServer.__init__
    方法,将自定义的继承自SocketServer.BaseRequestHandler 的类
    MyRequestHandle赋值给 self.RequestHandlerClass
  • 执行 BaseServer.server_forever 方法,While
    循环一直监听是否有客户端请求到达 …
  • 当客户端连接到达服务器
  • 执行 ThreadingMixIn.process_request 方法,创建一个 “线程”
    用来处理请求
  • 执行 ThreadingMixIn.process_request_thread 方法
  • 执行 BaseServer.finish_request
    方法,执行 self.RequestHandlerClass()  即:执行 自定义
    MyRequestHandler
    的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用
    MyRequestHandler的handle方法)

ThreadingTCPServer相关源码:

澳门新浦京手机版 12

  1 class BaseServer:
  2 
  3     """Base class for server classes.
  4 
  5     Methods for the caller:
  6 
  7     - __init__(server_address, RequestHandlerClass)
  8     - serve_forever(poll_interval=0.5)
  9     - shutdown()
 10     - handle_request()  # if you do not use serve_forever()
 11     - fileno() -> int   # for select()
 12 
 13     Methods that may be overridden:
 14 
 15     - server_bind()
 16     - server_activate()
 17     - get_request() -> request, client_address
 18     - handle_timeout()
 19     - verify_request(request, client_address)
 20     - server_close()
 21     - process_request(request, client_address)
 22     - shutdown_request(request)
 23     - close_request(request)
 24     - handle_error()
 25 
 26     Methods for derived classes:
 27 
 28     - finish_request(request, client_address)
 29 
 30     Class variables that may be overridden by derived classes or
 31     instances:
 32 
 33     - timeout
 34     - address_family
 35     - socket_type
 36     - allow_reuse_address
 37 
 38     Instance variables:
 39 
 40     - RequestHandlerClass
 41     - socket
 42 
 43     """
 44 
 45     timeout = None
 46 
 47     def __init__(self, server_address, RequestHandlerClass):
 48         """Constructor.  May be extended, do not override."""
 49         self.server_address = server_address
 50         self.RequestHandlerClass = RequestHandlerClass
 51         self.__is_shut_down = threading.Event()
 52         self.__shutdown_request = False
 53 
 54     def server_activate(self):
 55         """Called by constructor to activate the server.
 56 
 57         May be overridden.
 58 
 59         """
 60         pass
 61 
 62     def serve_forever(self, poll_interval=0.5):
 63         """Handle one request at a time until shutdown.
 64 
 65         Polls for shutdown every poll_interval seconds. Ignores
 66         self.timeout. If you need to do periodic tasks, do them in
 67         another thread.
 68         """
 69         self.__is_shut_down.clear()
 70         try:
 71             while not self.__shutdown_request:
 72                 # XXX: Consider using another file descriptor or
 73                 # connecting to the socket to wake this up instead of
 74                 # polling. Polling reduces our responsiveness to a
 75                 # shutdown request and wastes cpu at all other times.
 76                 r, w, e = _eintr_retry(select.select, [self], [], [],
 77                                        poll_interval)
 78                 if self in r:
 79                     self._handle_request_noblock()
 80         finally:
 81             self.__shutdown_request = False
 82             self.__is_shut_down.set()
 83 
 84     def shutdown(self):
 85         """Stops the serve_forever loop.
 86 
 87         Blocks until the loop has finished. This must be called while
 88         serve_forever() is running in another thread, or it will
 89         deadlock.
 90         """
 91         self.__shutdown_request = True
 92         self.__is_shut_down.wait()
 93 
 94     # The distinction between handling, getting, processing and
 95     # finishing a request is fairly arbitrary.  Remember:
 96     #
 97     # - handle_request() is the top-level call.  It calls
 98     #   select, get_request(), verify_request() and process_request()
 99     # - get_request() is different for stream or datagram sockets
100     # - process_request() is the place that may fork a new process
101     #   or create a new thread to finish the request
102     # - finish_request() instantiates the request handler class;
103     #   this constructor will handle the request all by itself
104 
105     def handle_request(self):
106         """Handle one request, possibly blocking.
107 
108         Respects self.timeout.
109         """
110         # Support people who used socket.settimeout() to escape
111         # handle_request before self.timeout was available.
112         timeout = self.socket.gettimeout()
113         if timeout is None:
114             timeout = self.timeout
115         elif self.timeout is not None:
116             timeout = min(timeout, self.timeout)
117         fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
118         if not fd_sets[0]:
119             self.handle_timeout()
120             return
121         self._handle_request_noblock()
122 
123     def _handle_request_noblock(self):
124         """Handle one request, without blocking.
125 
126         I assume that select.select has returned that the socket is
127         readable before this function was called, so there should be
128         no risk of blocking in get_request().
129         """
130         try:
131             request, client_address = self.get_request()
132         except socket.error:
133             return
134         if self.verify_request(request, client_address):
135             try:
136                 self.process_request(request, client_address)
137             except:
138                 self.handle_error(request, client_address)
139                 self.shutdown_request(request)
140 
141     def handle_timeout(self):
142         """Called if no new request arrives within self.timeout.
143 
144         Overridden by ForkingMixIn.
145         """
146         pass
147 
148     def verify_request(self, request, client_address):
149         """Verify the request.  May be overridden.
150 
151         Return True if we should proceed with this request.
152 
153         """
154         return True
155 
156     def process_request(self, request, client_address):
157         """Call finish_request.
158 
159         Overridden by ForkingMixIn and ThreadingMixIn.
160 
161         """
162         self.finish_request(request, client_address)
163         self.shutdown_request(request)
164 
165     def server_close(self):
166         """Called to clean-up the server.
167 
168         May be overridden.
169 
170         """
171         pass
172 
173     def finish_request(self, request, client_address):
174         """Finish one request by instantiating RequestHandlerClass."""
175         self.RequestHandlerClass(request, client_address, self)
176 
177     def shutdown_request(self, request):
178         """Called to shutdown and close an individual request."""
179         self.close_request(request)
180 
181     def close_request(self, request):
182         """Called to clean up an individual request."""
183         pass
184 
185     def handle_error(self, request, client_address):
186         """Handle an error gracefully.  May be overridden.
187 
188         The default is to print a traceback and continue.
189 
190         """
191         print '-'*40
192         print 'Exception happened during processing of request from',
193         print client_address
194         import traceback
195         traceback.print_exc() # XXX But this goes to stderr!
196         print '-'*40
197 
198 BaseServer

BaseServer
澳门新浦京手机版 13

  1 class TCPServer(BaseServer):
  2 
  3     """Base class for various socket-based server classes.
  4 
  5     Defaults to synchronous IP stream (i.e., TCP).
  6 
  7     Methods for the caller:
  8 
  9     - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
 10     - serve_forever(poll_interval=0.5)
 11     - shutdown()
 12     - handle_request()  # if you don't use serve_forever()
 13     - fileno() -> int   # for select()
 14 
 15     Methods that may be overridden:
 16 
 17     - server_bind()
 18     - server_activate()
 19     - get_request() -> request, client_address
 20     - handle_timeout()
 21     - verify_request(request, client_address)
 22     - process_request(request, client_address)
 23     - shutdown_request(request)
 24     - close_request(request)
 25     - handle_error()
 26 
 27     Methods for derived classes:
 28 
 29     - finish_request(request, client_address)
 30 
 31     Class variables that may be overridden by derived classes or
 32     instances:
 33 
 34     - timeout
 35     - address_family
 36     - socket_type
 37     - request_queue_size (only for stream sockets)
 38     - allow_reuse_address
 39 
 40     Instance variables:
 41 
 42     - server_address
 43     - RequestHandlerClass
 44     - socket
 45 
 46     """
 47 
 48     address_family = socket.AF_INET
 49 
 50     socket_type = socket.SOCK_STREAM
 51 
 52     request_queue_size = 5
 53 
 54     allow_reuse_address = False
 55 
 56     def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
 57         """Constructor.  May be extended, do not override."""
 58         BaseServer.__init__(self, server_address, RequestHandlerClass)
 59         self.socket = socket.socket(self.address_family,
 60                                     self.socket_type)
 61         if bind_and_activate:
 62             try:
 63                 self.server_bind()
 64                 self.server_activate()
 65             except:
 66                 self.server_close()
 67                 raise
 68 
 69     def server_bind(self):
 70         """Called by constructor to bind the socket.
 71 
 72         May be overridden.
 73 
 74         """
 75         if self.allow_reuse_address:
 76             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 77         self.socket.bind(self.server_address)
 78         self.server_address = self.socket.getsockname()
 79 
 80     def server_activate(self):
 81         """Called by constructor to activate the server.
 82 
 83         May be overridden.
 84 
 85         """
 86         self.socket.listen(self.request_queue_size)
 87 
 88     def server_close(self):
 89         """Called to clean-up the server.
 90 
 91         May be overridden.
 92 
 93         """
 94         self.socket.close()
 95 
 96     def fileno(self):
 97         """Return socket file number.
 98 
 99         Interface required by select().
100 
101         """
102         return self.socket.fileno()
103 
104     def get_request(self):
105         """Get the request and client address from the socket.
106 
107         May be overridden.
108 
109         """
110         return self.socket.accept()
111 
112     def shutdown_request(self, request):
113         """Called to shutdown and close an individual request."""
114         try:
115             #explicitly shutdown.  socket.close() merely releases
116             #the socket and waits for GC to perform the actual close.
117             request.shutdown(socket.SHUT_WR)
118         except socket.error:
119             pass #some platforms may raise ENOTCONN here
120         self.close_request(request)
121 
122     def close_request(self, request):
123         """Called to clean up an individual request."""
124         request.close()
125 
126 TCPServer

TCPServer
澳门新浦京手机版 14

 1 class ThreadingMixIn:
 2     """Mix-in class to handle each request in a new thread."""
 3 
 4     # Decides how threads will act upon termination of the
 5     # main process
 6     daemon_threads = False
 7 
 8     def process_request_thread(self, request, client_address):
 9         """Same as in BaseServer but as a thread.
10 
11         In addition, exception handling is done here.
12 
13         """
14         try:
15             self.finish_request(request, client_address)
16             self.shutdown_request(request)
17         except:
18             self.handle_error(request, client_address)
19             self.shutdown_request(request)
20 
21     def process_request(self, request, client_address):
22         """Start a new thread to process the request."""
23         t = threading.Thread(target = self.process_request_thread,
24                              args = (request, client_address))
25         t.daemon = self.daemon_threads
26         t.start()
27 
28 ThreadingMixIn

ThreadingMixIn
澳门新浦京手机版 15

1 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

ThreadingTCPServer

RequestHandler相关源码

澳门新浦京手机版 16

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

SocketServer.BaseRequestHandler

SocketServer.BaseRequestHandler

实例:

澳门新浦京手机版 17

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import SocketServer
 4 
 5 class MyServer(SocketServer.BaseRequestHandler):
 6 
 7     def handle(self):
 8         # print self.request,self.client_address,self.server
 9         conn = self.request
10         conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
11         Flag = True
12         while Flag:
13             data = conn.recv(1024)
14             if data == 'exit':
15                 Flag = False
16             elif data == '0':
17                 conn.sendall('通过可能会被录音.balabala一大推')
18             else:
19                 conn.sendall('请重新输入.')
20 
21 
22 if __name__ == '__main__':
23     server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
24     server.serve_forever()
25 
26 服务端

server
澳门新浦京手机版 18

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 
 7 ip_port = ('127.0.0.1',8009)
 8 sk = socket.socket()
 9 sk.connect(ip_port)
10 sk.settimeout(5)
11 
12 while True:
13     data = sk.recv(1024)
14     print 'receive:',data
15     inp = raw_input('please input:')
16     sk.sendall(inp)
17     if inp == 'exit':
18         break
19 
20 sk.close()
21 
22 客户端

client

源码精简:

import socket
import threading
import select


def process(request, client_address):
    print request,client_address
    conn = request
    conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
    flag = True
    while flag:
        data = conn.recv(1024)
        if data == 'exit':
            flag = False
        elif data == '0':
            conn.sendall('通过可能会被录音.balabala一大推')
        else:
            conn.sendall('请重新输入.')

sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.bind(('127.0.0.1',8002))
sk.listen(5)

while True:
    r, w, e = select.select([sk,],[],[],1)
    print 'looping'
    if sk in r:
        print 'get request'
        request, client_address = sk.accept()
        t = threading.Thread(target=process, args=(request, client_address))
        t.daemon = False
        t.start()

sk.close()

如精简代码可以看出,SocketServer的ThreadingTCPServer之所以可以同时处理请求得益于 select 和 Threading 两个东西,其实本质上就是在服务器端为每一个客户端创建一个线程,当前线程用来处理对应客户端的请求,所以,可以支持同时n个客户端链接(长连接)。

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和执行流程基本一致,只不过在内部分别为请求者建立
“线程”  和 “进程”。

基本使用:

澳门新浦京手机版 19

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 import SocketServer
 4 
 5 class MyServer(SocketServer.BaseRequestHandler):
 6 
 7     def handle(self):
 8         # print self.request,self.client_address,self.server
 9         conn = self.request
10         conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
11         Flag = True
12         while Flag:
13             data = conn.recv(1024)
14             if data == 'exit':
15                 Flag = False
16             elif data == '0':
17                 conn.sendall('通过可能会被录音.balabala一大推')
18             else:
19                 conn.sendall('请重新输入.')
20 
21 
22 if __name__ == '__main__':
23     server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
24     server.serve_forever()
25 
26 服务端

服务端
澳门新浦京手机版 20

 1 #!/usr/bin/env python
 2 # -*- coding:utf-8 -*-
 3 
 4 import socket
 5 
 6 
 7 ip_port = ('127.0.0.1',8009)
 8 sk = socket.socket()
 9 sk.connect(ip_port)
10 sk.settimeout(5)
11 
12 while True:
13     data = sk.recv(1024)
14     print 'receive:',data
15     inp = raw_input('please input:')
16     sk.sendall(inp)
17     if inp == 'exit':
18         break
19 
20 sk.close()
21 
22 客户端

客户端

以上ForkingTCPServer只是将 ThreadingTCPServer 实例中的代码:

server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
变更为:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)

SocketServer的ThreadingTCPServer之所以可以同时处理请求得益于 select 和 os.fork 两个东西,其实本质上就是在服务器端为每一个客户端创建一个进程,当前新创建的进程用来处理对应客户端的请求,所以,可以支持同时n个客户端链接(长连接)。

 

这部分代码设置了变量、地址、端口、最大客户端和客户端数组。接下来创建socket并将其绑定到我们指定的地址和端口上。

3.1socket 模块

IO多路复用

I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

Linux

Linux中的 select,poll,epoll 都是IO多路复用的机制。

澳门新浦京手机版 21

 1 select
 2 
 3 select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些
 4 文件描述符从而进行后续的读写操作。
 5 select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
 6 select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
 7 另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()
 8 会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
 9 
10 poll
11 
12 poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
13 poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
14 另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,
15 这种方式称为水平触发(Level Triggered)。
16 
17 epoll
18 
19 直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
20 epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),
21 理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
22 epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中
23 依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
24 另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个
25 文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

View Code

Python

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll
从而实现IO多路复用。

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测
普通文件操作 自动上次读取是否已经变化。

对于select方法:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)

参数: 可接受四个参数(前三个必须)
返回值:三个列表

select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

利用select监听多端口实例:

import socket

sk1 = socket.socket()
sk1.bind(('127.0.0.1',8001))
sk1.listen()

sk2 = socket.socket()
sk2.bind(('127.0.0.1',8002))
sk2.listen()

sk3 = socket.socket()
sk3.bind(('127.0.0.1',8003))
sk3.listen()

inputs = [sk1,sk2,sk3]
import select

while True:
    # [sk1,sk2,sk3]内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化
    # 如果有人连接sk1  ->  r_list = [sk1]
    # 如果有人同时连接sk1,sk2  ->  r_list = [sk1,sk2]
    r_list,w_list,e_list = select.select(inputs,[],inputs,1)
    for sk in r_list:
        # 每一个连接对象
        conn,address = sk.accept()
        conn.sendall(bytes('hello',encoding='utf-8'))
        conn.close()

    #当sk1,sk2,sk3其中有哪个有错误的时候,就将哪个给移除
    for sk in e_list:
        inputs.remove(sk)

澳门新浦京手机版 22

 1 import socket
 2 sk1 = socket.socket()
 3 sk1.bind(('127.0.0.1',8001))
 4 sk1.listen()
 5 
 6 inputs = [sk1,]
 7 import select
 8 
 9 while True:
10     # [sk1,sk2,sk3]内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化
11     # 如果有人连接sk1  ->  r_list = [sk1]
12     # 如果有人第一次连接,sk1发生变化
13     r_list,w_list,e_list = select.select(inputs,[],inputs,1)
14 
15     print('正在监听的socket对象%d'% len(inputs))
16     print(r_list)
17     for sk1_or_conn in r_list:
18         # 每一个连接对象
19         if sk1_or_conn == sk1:
20             # sk==sk1 表示有新用户来连接了
21             conn,address = sk1_or_conn.accept()
22             inputs.append(conn)
23         else:
24             # 有老用户发消息来了
25             try:
26                 data_bytes = sk1_or_conn.recv(1024)
27                 data_str = str(data_bytes,encoding='utf-8')
28                 sk1_or_conn.sendall(bytes(data_str+'好',encoding='utf-8'))
29             except Exception as ex:
30                 inputs.remove(sk1_or_conn)

利用select实现伪同时处理多个Socket客户端请求:服务端
澳门新浦京手机版 23

 1 import socket
 2 sk1 = socket.socket()
 3 sk1.bind(('127.0.0.1',8001))
 4 sk1.listen()
 5 
 6 inputs = [sk1,]
 7 outputs = []
 8 message_dict = {}
 9 
10 import select
11 
12 while True:
13     r_list,w_list,e_list = select.select(inputs,outputs,inputs,1)
14     print('正在监听的socket对象%d'% len(inputs))
15     print(r_list)
16     # 这个for循环只用于处理接收
17     for sk1_or_conn in r_list:
18         # 每一个连接对象
19         if sk1_or_conn == sk1:
20             # sk==sk1 表示有新用户来连接了
21             conn,address = sk1_or_conn.accept()
22             inputs.append(conn)
23             message_dict[conn] = []
24         else:
25             # 有老用户发消息来了
26             try:
27                 data_bytes = sk1_or_conn.recv(1024)
28             except Exception as ex:
29                 print(ex)
30                 inputs.remove(sk1_or_conn)
31             else:
32                 #用户正常发送消息
33                 data_str = str(data_bytes,encoding='utf-8')
34                 message_dict[sk1_or_conn].append(data_str)
35 
36                 outputs.append(sk1_or_conn)
37     # 这个for循环用户处理发送信息
38     # w_list仅仅保存了谁给我发过消息
39     for conn in w_list:
40         recv_str = message_dict[conn][0]
41         del message_dict[conn][0]
42         conn.sendall(bytes(recv_str+'好',encoding='utf-8'))
43         outputs.remove(conn)

利用select实现伪同时处理多个Socket客户端请求:服务端优化版

 

下面我们要做的事情就是执行一个死循环(实际上我们是故意的!)。Lines:22至32。在这部分代码中我们做的第一步是设置 $read 数组。此数组包含所有客户端的套接字和我们主服务器的套接字。这个变量稍后会用于select语句:告诉PHP监听来自这些客户端的每一条消息。

socket模块属性

澳门新浦京手机版 24

澳门新浦京手机版 25

澳门新浦京手机版 26

本节总结

一:socket,服务端同时只能处理一个请求

二:select + socket,伪并发

  a : r_list  既读又写

  b : r_list , w_list 读写分离

三:socketserver

  select/epoll + socket + 多线程    真正实现多并发操作

什么是客户/服务器架构?
什么是客户/服务器架构?不同的人有不同的答案。这要看你…

socket_select()的最后一个参数告诉我们的服务器在返回值之前最多等待5秒钟。如果它的返回值小于1,那么就表示没有收到任何数据,所以只需要返回循环顶部,继续等待。

套接字创建:

socket(socket_family, socket_type, protocol=0)
tcpSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ud pSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

socket对象方法:

澳门新浦京手机版 27

澳门新浦京手机版 28

澳门新浦京手机版 29

脚本的下一个部分,是增加新的客户端到数组中。Lines:33至44。

服务器创建:

ss = socket() # 创建服务器套接字
ss.bind() # 套接字与地址绑定
ss.listen() # 监听连接
inf_loop: # 服务器无限循环
cs = ss.accept() # 接受客户端连接
comm_loop: # 通信循环
cs.recv()/cs.send() # 对话(接收/发送)
cs.close() # 关闭客户端套接字
ss .close() # 关闭服务器套接字#(可选)

为服务器实现一个智能的退出方案时,建议调用close()方法。

将新的客户端放置在列表的末尾。检查以确保客户端的数量没有超过我们想要服务器处理的数量。

客户端创建:

cs = socket() # 创建客户端套接字
cs.connect() # 尝试连接服务器
comm_loop: # 通信循环
cs.send()/cs.recv() # 对话(发送/接收)
cs .close() # 关闭客户端套接字

下面要介绍的代码块相当大,也是服务器的主要部分。当客户端将消息发送到服务器时,就需要这块代码挺身而出来处理。消息可以是各种各样的,断开消息、实际断开——只要是服务器需要处理的消息。Lines:46至末尾。

 I/O多路复用

多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。这样在处理1000个连接时,只需要1个线程监控就绪状态,对就绪的每个连接开一个线程处理就可以了,这样需要的线程数大大减少,减少了内存开销和上下文切换的CPU开销。

举一个例子,模拟一个tcp服务器处理30个客户socket。

假设你是一个老师,让30个学生解答一道题目,然后检查学生做的是否正确,你有下面几个选择:

1.
第一种选择:按顺序逐个检查,先检查A,然后是B,之后是C、D。。。这中间如果有一个学生卡主,全班都会被耽误。
这种模式就好比,你用循环挨个处理socket,根本不具有并发能力。

  1. 第二种选择:你创建30个分身,每个分身检查一个学生的答案是否正确。
    这种类似于为每一个用户创建一个进程或者线程处理连接。
    3.
    第三种选择,你站在讲台上等,谁解答完谁举手。这时C、D举手,表示他们解答问题完毕,你下去依次检查C、D的答案,然后继续回到讲台上等。此时E、A又举手,然后去处理E和A。。。
    这种就是IO复用模型,Linux下的select、poll和epoll就是干这个的。将用户socket对应的fd注册进epoll,然后epoll帮你监听哪些socket上有消息到达,这样就避免了大量的无用操作。此时的socket应该采用非阻塞模式
    这样,整个过程只在调用select、poll、epoll这些调用的时候才会阻塞,收发客户消息是不会阻塞的,整个进程或者线程就被充分利用起来,这就是事件驱动,所谓的reactor模式。

澳门新浦京手机版 30

代码循环通过每个客户端并检查是否收到来自于它们的消息。如果是,获取输入的内容。根据输入来检查这是否是一个断开消息,如果是那就从数组中删除它们,反之,那它就是一个正常的消息,那我们的服务器再次通过所有客户端,并一个一个写信息给他们,跳过发送者。

方法:

windows python:

``提供: select

Mac Python:

``提供: select

Linux Python:

``提供: select、poll、epoll

 

select方法:


select  -在单线程网络服务中器程序中,管理多个套接字连接。

select的原型为(rlist,wlist,xlist[,timeout]),其中rlist是等待读取的对象,wlist是等待写入的对象,xlist是等待异常的对象,最后一个是可选对象,指定等待的时间,单位是s.

select()方法的返回值是准备好的对象的三元组,若在timeout的时间内,没有对象准备好,那么返回值将是空的列表,它采用的是轮询的方式来实现异步通信的。

更详细参照:

好了,下面试试创造你自己的聊天服务器吧!

服务器:

    #!/usr/bin/python  

    'test TCP server'  

    from socket import *  
    from time import ctime  
    import select  
    import sys  

    HOST = ''  
    PORT = 21567  
    BUFSIZ = 1024  
    ADDR = (HOST, PORT)  

    tcpSerSock = socket(AF_INET, SOCK_STREAM)  
    tcpSerSock.bind(ADDR)  
    tcpSerSock.listen(5)  
    input = [tcpSerSock, sys.stdin]     #input是一个列表,初始有欢迎套接字以及标准输入  

    while True:  
        print 'waiting for connection...'  
        tcpCliSock, addr = tcpSerSock.accept()  
        print '...connected from:',addr  
        input.append(tcpCliSock)    #将服务套接字加入到input列表中  
        while True:  
            readyInput,readyOutput,readyException = select.select(input,[],[])  #从input中选择,轮流处理client的请求连接(tcpSerSock),client发送来的消息(tcpCliSock),及服务器端的发送消息(stdin)  
            for indata in readyInput:  
                if indata==tcpCliSock:    #处理client发送来的消息  
                    data = tcpCliSock.recv(BUFSIZ)  
                    print data  
                    if data=='88':  
                        input.remove(tcpCliSock)  
                        break  
                else:             #处理服务器端的发送消息  
                    data = raw_input('>')  
                    if data=='88':  
                        tcpCliSock.send('%s' %(data))  
                        input.remove(tcpCliSock)  
                        break  
                    tcpCliSock.send('[%s] %s' %(ctime(), data))  
            if data=='88':  
                break  
        tcpCliSock.close()  
    tcpSerSock.close()  

客户端:

    #!/usr/bin/python  

    'test tcp client'  

    from socket import *  
    from time import ctime  
    import select  
    import sys  

    HOST = 'localhost'  
    PORT = 21567  
    BUFSIZ = 1024  
    ADDR = (HOST, PORT)  
    tcpCliSock = socket(AF_INET, SOCK_STREAM)  
    tcpCliSock.connect(ADDR)  
    input = [tcpCliSock,sys.stdin]  

    while True:  
        readyInput,readyOutput,readyException = select.select(input,[],[])  
        for indata in readyInput:  
            if indata==tcpCliSock:  
                data = tcpCliSock.recv(BUFSIZ)  
                print data  
                if data=='88':  
                    break     
            else:  
                data = raw_input('>')  
                if data=='88':    
                    tcpCliSock.send('%s' %(data))  
                    break  
                tcpCliSock.send('[%s] %s' %(ctime(), data))  
        if data=='88':    
            break  
    tcpCliSock.close()  

 

3.2 socketserver 模块

SocketServer内部使用 IO多路复用 以及 “多线程” 和 “多进程”
,从而实现并发处理多个客户端请求的Socket服务端。即:每个客户端请求连接到服务器时,Socket服务端都会在服务器是创建一个“线程”或者“进程”
专门负责处理当前客户端的所有请求。

该模块中的类

澳门新浦京手机版 31

基本用法:

 创建SocketServer TCP 服务器

# 请求处理程序MyRequestHandler,作为SocketServer
# 中StreamRequestHandler 的一个子类,并重写了它的handle()方法,该方法在基类Request 中
# 默认情况下没有任何行为。
# def handle(self):
# pass
# 当接收到一个来自客户端的消息时,它就会调用handle()方法。而StreamRequestHandler
# 类将输入和输出套接字看作类似文件的对象,因此我们将使用readline()来获取客户端消息,
# 并利用write()将字符串发送回客户端。
from socketserver import TCPServer as TCP,StreamRequestHandler as SRH
from time import  ctime
HOST=''
PORT='21567'
ADDR=(HOST,PORT)

class MyRequestHandle(SRH):
    def handle(self):
        print('...connected from:{0}',self.client_address)
        self.wfile.write('[%s]%s'%(ctime(),self.rfile.readline()))
tcpSever=TCP(ADDR,MyRequestHandle)
print('waiting for connection')
tcpSever.serve_forever()

 创建TCP客户端

from socket import *
HOST='localhost'
PORT='21567'
BUFSIZ=1024
ADDR=(HOST,PORT)


while True:
    tcpClisocket=socket(ADDR,SOCK_STREAM)
    tcpClisocket.connect(ADDR)
    data=input('>')
    if not data:
        break
        # 因为这里使用的处理程序类对待套
        # 接字通信就像文件一样,所以必须发送行终止符(回车和换行符)
        # 而服务器只是保留并重用这里发送的终止符。当得到从服务器返回的消息时,用strip()
        # 函数对其进行处理并使用由print声明自动提供的换行符。
    tcpClisocket.send('%srn'%data)
    data=tcpClisocket.recv(BUFSIZ)
    if not data:
        break
    print(data.strip())
    tcpClisocket.close()

 ThreadingTCPServer

ThreadingTCPServer实现的Soket服务器内部会为每个client创建一个 “线程”,该线程用来和客户端进行交互。

参照:

  • 创建一个继承自 SocketServer.BaseRequestHandler 的类
  • 类中必须定义一个名称为 handle 的方法
  • 启动ThreadingTCPServer

澳门新浦京手机版 32

 

#     启动服务端程序
#     执行 TCPServer.__init__ 方法,创建服务端Socket对象并绑定 IP 和 端口
#     执行 BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler 的类 MyRequestHandle赋值给 self.RequestHandlerClass
#     执行 BaseServer.server_forever 方法,While 循环一直监听是否有客户端请求到达 ...
#     当客户端连接到达服务器
#     执行 ThreadingMixIn.process_request 方法,创建一个 “线程” 用来处理请求
#     执行 ThreadingMixIn.process_request_thread 方法
#     执行 BaseServer.finish_request 方法,执行 self.RequestHandlerClass()  即:执行 自定义 MyRequestHandler 的构造方法(自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用 MyRequestHandler的handle方法)
# 服务器
# #!/usr/bin/env python
# # -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('通过可能会被录音.balabala一大推')
            else:
                conn.sendall('请重新输入.')


if __name__ == '__main__':
    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()

ForkingTcpSever

ForkingTCPServer和ThreadingTCPServer的使用和执行流程基本一致,只不过在内部分别为请求者建立
“线程”  和 “进程”。

3.3 Twisted模块

Twisted
是一个完整的事件驱动的网络框架,利用它既能使用也能开发完整的异步网络应用程序和协议。它提供了大量的支持来建立完整的系统,包括网络协议、线程、安全性和身份验证、聊天/
IM、DBM 及RDBMS 数据库集成、Web/因特网、电子邮件、命令行参数、GUI
集成工具包等。与SocketServer 类似,Twisted
的大部分功能都存在于它的类中。

 Twisted Reactor TCP服务器

基于protocol
类创建TSServProtocol,重写connectionMade()和dataReceived()方法,当一个客户端连接到服务器时就会执行connectionMade()方法,而当服务器接收到客户端通过网络发送的一些数据时就会调用dataReceived()方法。
reactor
会作为该方法的一个参数在数据中传输,这样就能在无须自己提取它的情况下访问它。在服务器代码的最后部分中,创建了一个协议工厂。它之所以被称为工厂,是因为每次得到一个接入连接时,都能“制造”协议的一个实例。然后在reactor
中安装一个TCP
监听器,以此检查服务请求。当它接收到一个请求时,就会创建一个TSServProtocol
实例来处理那个客户端的事务。

from twisted.internet import protocol,reactor
from time import ctime
PORT=21567
class TSServProtocal(protocol.Protocol):
    def connectionMade(self):
        clnt=self.clnt=self.transport.getPeer().host
        print('...conected from:',clnt)
    def dataReceived(self, data):
        self.transport.write('[%s]%s'%(ctime(),data))

factory=protocol.Factory()
factory.protocol=TSServProtocal
print('waiting for conneciton')
reactor.listenTCP(PORT,factory)
reactor.run()

 Twisted Reactor TCP客户端

from twisted.internet import protocol, reactor
HOST='localhost'
PORT=21567
class TSClntProtocal(protocol.Protocol):
    def sendData(self):
        data=input('>')
        if data:
            print('...sending%...'%data)
        else:
            self.transport.loseConnection()
    def connectionMade(self):
        self.sendData()
    def dataReceived(self, data):
        print(data)
class TSClntFactory(protocol.Factory):
    protocol=TSClntProtocal
    clientConnectionLost=clientConnectionFailed=
    lambda self,connector,reason:reactor.stop()
reactor.connectTCP(HOST,PORT,TSClntFactory())
reactor.run()

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注