Слияние кода завершено, страница обновится автоматически
# encoding:utf-8
# 封装一些群与用户关系相关的业务功能
import time, random, threading
from db.wxpyDbGroupUser import *
from db.wxpyDbAccessFrq import *
from db.wxpyDbUser import *
from wxpyUserService import *
g_updateUserLock = threading.Lock()
# 启线程更新群关系
def updateGroupAndUserThread(bot, dayGap):
# 整个项目只需要一个线程
if not g_updateUserLock.acquire(False):
logInfo('updateUse ing')
return
logInfo('updateGroupAndUserThrea lock start: '+str(g_updateUserLock))
thread = threading.Thread(target=updateUser,args=(bot,dayGap))
thread.setDaemon(True)
thread.start()
# 更新数据库中保存的好友信息
# bot: 当前登录用户
# dayGap: 隔多少天更新一次,单位天
def updateUser(bot, dayGap):
rand = random.randint(4,16)
logInfo('updateUserThrea sleep : '+str(rand))
time.sleep(rand)
UPDATE_USER_TYPE = 'Bot.groups'
try:
dbd = dbData()
# 获取上次更新好友日期
latestAccess = dbGetAccessFrqLatest(dbd, bot.self.puid, UPDATE_USER_TYPE)
if latestAccess is not None:
logInfo('updateUserThrea latest: '+str(latestAccess['create_time']))
cut = latestAccess['create_time'].now()-latestAccess['create_time']
if cut.days<dayGap :
logInfo('updateUserThrea less than dayGap: '+str(cut))
return
# 否则是要更新的
# 取所有好友数据
timeStart = time.perf_counter()
groups = bot.groups(update=True, contact_only=False)
timeEnd = time.perf_counter()
timeSpend = int((timeEnd-timeStart)*1000000000) # 秒乘10的9次方
dbAddAccessFrq(dbd, bot.self, UPDATE_USER_TYPE, timeSpend)
timeStart = time.perf_counter()
for g in groups:
addGroup(dbd, g)
timeEnd = time.perf_counter()
logInfo('batch addGroupMembe spend time: '+str(timeEnd-timeStart))
except Exception as e:
dealException()
logError('updateUserThrea err: '+str(e))
else:
logInfo('updateUserThrea success')
finally:
g_updateUserLock.release()
logInfo('updateGroupAndUserThrea lock release: '+str(g_updateUserLock))
closeDb(dbd)
# 更新组成员信息(默认最短一小时一次)
# rightNow: 是否强制立即更新,不管数据库最近一次时间间隔
def updateGroupMember(dbd, group, rightNow=False):
UPDATE_GROUP_MEMBER_TYPE = 'Group.update_group'
try:
if not rightNow:
# 获取上次更新好友日期
latestAccess = dbGetAccessFrqLatest(dbd, group.puid, UPDATE_GROUP_MEMBER_TYPE)
if latestAccess is not None:
logInfo('updateGroupMembe latest: '+str(latestAccess['create_time']))
cut = latestAccess['create_time'].now()-latestAccess['create_time']
# 更新群信息时间为一小时
if cut.days<=0 and cut.seconds<60*60 :
logInfo('updateGroupMembe less than an hour')
return
# 否则是要更新的
logInfo('updateGroupMembe update group: '+str(group))
# 更新组成员信息
timeStart = time.perf_counter()
group.update_group(members_details=True)
timeEnd = time.perf_counter()
timeSpend = int((timeEnd-timeStart)*1000000000) # 秒乘10的9次方
dbAddAccessFrq(dbd, group, UPDATE_GROUP_MEMBER_TYPE, timeSpend)
except Exception as e:
dealException()
logError('updateGroupMembe err: '+str(e))
else:
logInfo('updateGroupMembe success: '+group.puid)
# 由于 Group.members 取到的成员有些可能没有 puid
# 因此封装一个方法获取 Group 的成员,不要直接用 Group.members
# rightNow: 是否强制立即更新,不管数据库最近一次时间间隔
def getGroupMembers(dbd, group, rightNow=False):
updateGroupMember(dbd, group, rightNow)
# 去掉没有 puid 的用户
members = []
for m in group.members:
puid = getMemberPuid(m)
if puid is not None:
members.append(m)
else:
logError('getGroupMember getMemberPui empty: '+str(group))
return members
# 以 groupPuid 为 key 的锁列表
g_addGroupLockDict = {}
g_addGroupLock = threading.Lock()
# 启线程更新群成员数据
def addGroupThread(group):
# 防止并发操作 g_addGroupLockDict
with g_addGroupLock:
# 是否已有锁
lock = None
logInfo('addGroupThrea in: '+ str(group.puid))
if group.puid in g_addGroupLockDict.keys():
logInfo('addGroupThrea has lock: '+ str(group.puid))
lock = g_addGroupLockDict[group.puid]
if not lock.acquire(False):
# 拿不到锁证明已经有线程在跑,直接退出
logInfo('addGroupThrea locking: '+ str(group.puid))
return
# else: 拿到锁往下走
logInfo('addGroupThrea get lock: '+ str(group.puid))
else:
logInfo('addGroupThrea no lock: '+ str(group.puid))
lock = threading.Lock()
g_addGroupLockDict[group.puid] = lock
lock.acquire()
logInfo('addGroupThrea lock start: '+str(lock))
thread = threading.Thread(target=addGroupNoDb,args=(group,lock))
thread.setDaemon(True)
thread.start()
logInfo('addGroupThrea out: '+ str(group.puid))
# 将群与用户关系存到表中,不存在才添加,不删除已经离开群组的人
def addGroupNoDb(group, lock):
re = False
try:
dbd = dbData()
re = addGroup(dbd, group)
except Exception as e:
dealException()
logError('addGroupNoD err: '+str(e))
finally:
lock.release()
logInfo('addGroupThrea lock release: '+str(lock))
closeDb(dbd)
return re
# 将群与用户关系存到表中,不存在才添加,不删除已经离开群组的人
def addGroup(dbd, group):
members = getGroupMembers(dbd, group)
logInfo('addGrou members num: ' + str(len(members)))
addOrUpdateUser(dbd, group)
if group.puid is None:
logError('addGrou group.puid empty: '+obj2JsonStr(group))
return False
oldUsers = dbGetGroupUsers(dbd, group.puid)
# 不存在群关系,直接全部插入
if oldUsers is None:
for m in members:
dbAddGroupUser(dbd, group.puid, m.puid)
return True
# else 已存在群关系,根据新关系更新
logInfo('addGrou old members num: ' + str(len(oldUsers)))
# 只处理不存在旧表中的用户(增加)
dealNotInOldUsers(dbd, group.puid, members, oldUsers)
# 只处理旧表中多出来(已退群)的用户(删除)
dealOldUsersExtra(dbd, group.puid, members, oldUsers)
return True
# 有时从群中取出的成员再取 puid 时会报错(TypeError: 'NoneType' object is not subscriptable)
# 因此用此方法来取 puid, 取不到返回 None
def getMemberPuid(member, times=1):
try:
return member.puid
except Exception as e:
dealException()
logError('getMemberPui get puid err: '+str(times)+' '+str(e))
logError('getMemberPui member: '+str(member)+'\r\n'+str(member.raw))
# 重试一次
if times<1:
return getMemberPuid(member, times+1)
return None
# 只处理不存在旧表中的用户(增加)
def dealNotInOldUsers(dbd, groupPuid, newUsers, oldUsers):
for newUser in newUsers:
appear = False
for oldUser in oldUsers:
if newUser.puid==oldUser['user_puid']:
appear = True
break
if not appear:
re = dbAddGroupUser(dbd, groupPuid, newUser.puid, True)
if re:
addOrUpdateUser(dbd, newUser)
# 只处理旧表中多出来(已退群)的用户(删除)
def dealOldUsersExtra(dbd, groupPuid, newUsers, oldUsers):
for oldUser in oldUsers:
appear = False
for newUser in newUsers:
if newUser.puid==oldUser['user_puid']:
appear = True
break
if not appear:
# 旧表多出来且未退群的情况
dbDelGroupUser(dbd, groupPuid, oldUser['user_puid'])
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )