1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/zhengyitian-touj

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
testStream.py 921 Байт
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
a Отправлено 31.03.2020 14:11 bed36bf
import threading
from helpFunc import *
import time
import select
def doTest(sock):
sock.setblocking(0)
readBuf = b''
writeBuf = b''
readCo = writeCo = 0
staTime = time.time()
while True:
r = select.select([sock],[sock],[],1)
if r[0]:
readBuf += sock.recv(10000000)
while True:
msg = TOUMsg()
ret,readBuf = msg.unpack(readBuf)
if not ret:
break
readCo += 1
if r[1]:
if not writeBuf:
writeBuf = TOUMsg({},b's'*random.randint(10,2000)).pack()
writeCo += 1
n = sock.send(writeBuf)
writeBuf = writeBuf[n:]
if time.time()-staTime>1:
print (staTime,readCo,writeCo)
readCo = writeCo = 0
staTime = time.time()

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/zhengyitian-touj.git
git@api.gitlife.ru:oschina-mirror/zhengyitian-touj.git
oschina-mirror
zhengyitian-touj
zhengyitian-touj
master