1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/mrkelly-resources_packer

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
resources_packer 17 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
mrkelly Отправлено 18.06.2016 06:19 17fabd9
#!/bin/env python
#encoding=utf-8
"""
Author : chenpeilin <23110388@qq.com>
Website: http://github.com/mr-kelly/resources_packer
用于指定目录的差异zip包命令行工具,无项目逻辑依赖,可各个项目使用
注意点:跨版本,文件回滚; 相邻版本,文件删除
"""
import argparse, os, sys, zipfile, csv
import StringIO
import hashlib
import datetime
import shutil
import errno
MANIFEST_FILE_NAME ='.manifest'
MANIFEST_HEADERS = ['File', 'MD5', 'Size', 'MTime', 'MFormatTime']
def get_clean_path(dirtyPath):
"""
获取干净路径,主要替换windows的\\
"""
return dirtyPath.replace('\\', '/')
def get_stream_md5(stream):
"""
获取流的MD5
"""
return hashlib.md5(stream.read()).hexdigest().upper()
def get_file_md5(filePath):
"""
获取文件的MD5
"""
return get_stream_md5(open(filePath, 'rb'))
def verify_package(packagePath, writedFilesMap, deleted_files_map):
"""
校验文件内容是否跟manifest一致,一致则生成md5文件
"""
assert(os.path.isfile(packagePath))
with zipfile.ZipFile(packagePath, 'r', zipfile.ZIP_DEFLATED) as rz:
print 'Verify zip package correct ... %s' % packagePath
rManifestStream = rz.open(MANIFEST_FILE_NAME)
rManifestReader = csv.DictReader(rManifestStream, delimiter=' ', fieldnames=MANIFEST_HEADERS)
for row in rManifestReader:
# 忽略第一行header
if rManifestReader.line_num == 1:
continue
zipFilePath = row['File']
if not zipFilePath in writedFilesMap: # 上一次没有写入,忽略校验
continue
md5Str = row['MD5']
zipFileMd5 = get_stream_md5(rz.open(zipFilePath))
fullFile = writedFilesMap[zipFilePath]
assert(zipFileMd5 == md5Str)
assert(zipFileMd5 == get_file_md5(fullFile))
for deleted in deleted_files_map:
try:
rz.open(deleted)
except:
continue # 不存在就对了,存在反而不对
assert(False)
# 验证成功,生成md5文件
with open('%s.md5' % packagePath, 'wb+') as md5f:
md5f.write(get_file_md5(packagePath))
return True
def create_package(args, packagePath, paths, compareFilesMap=None, force_create=False):
"""
创建资源包
后面参数为空则是全量包了
force_create 默认如果没有任何文件差异,会自动删除掉文件,适用于跨版本差异包,文件回滚状态时,如3-7版本,虽然没有变化,但要强制创建
compareFilesMap 为上一版的文件列表
"""
print('@@ => Create Package ... %s' % packagePath)
writedFilesMap = {} # 写入过的文件统计 / zipPath -> fullPath
touchFilesMap = {} # 检查过但没有实际写入的文件统计 / zipPath -> fullPath
deleted_files_map = {} # 差异比较后,被删除的文件的列表
print paths
if os.path.isfile(packagePath):
print '[WARN]Delete and repack ... %s' % packagePath
os.remove(packagePath)
manifestStream = StringIO.StringIO()
manifestCsv = csv.DictWriter(manifestStream, delimiter=' ', fieldnames=MANIFEST_HEADERS)
manifestCsv.writeheader()
with zipfile.ZipFile(packagePath, 'w', zipfile.ZIP_DEFLATED) as zf:
for path in paths:
abspath = os.path.abspath(path)
if not os.path.isdir(abspath):
print('[WARN]Cannot found directory: %s, ignore it' % abspath)
continue
topDirPath = get_clean_path(os.path.dirname(abspath)) # 上级目录路径,替换掉这个字符串,就是最终目录相对路径了
print 'Top Dir: ' + topDirPath
print('')
for dirFullPath, subdirList, fileList in os.walk(abspath):
cleanDirFullPath = get_clean_path(dirFullPath)
# 文件夹相对zip的路径 = 完整路径 - 上层大目录路径
dirZipPath = cleanDirFullPath.replace(topDirPath, '')
for filePath in fileList:
fileFullPath = get_clean_path('%s/%s' % (cleanDirFullPath, filePath))
filename, file_extension = os.path.splitext(fileFullPath)
# 文件相对zip的路径
fileZipPath = get_clean_path('%s/%s' % (dirZipPath, filePath))
fileZipPath = fileZipPath.lstrip('/') # 去掉开始的/
# 扩展名判断
hasIgnoreExt = False
if args.ignores_ext != None:# and file_extension in args.ignores_ext:
for ext in args.ignores_ext:
if fileFullPath.endswith(ext):
hasIgnoreExt = True # 忽略该扩展名的
if hasIgnoreExt:
continue
# 是否激活黑名单模式
in_blacklist = False # 默认true
if args.blacklist: #黑名单存在则要判断
for black_item in args.blacklist:
if black_item in fileFullPath:
in_blacklist = True
break
if in_blacklist:
# 黑名单直接忽略
# if compareFilesMap:
# manifestCsv.writerow(compareFilesMap[fileZipPath])
continue
fileMD5 = get_file_md5(fileFullPath)
# 进行比较! 如果一样,不写入文件,但写入信息, 以下是写入文件
if ( \
compareFilesMap == None or \
(fileZipPath not in compareFilesMap) or \
(fileZipPath in compareFilesMap and compareFilesMap[fileZipPath]['MD5'] != fileMD5) \
):
print('%s --> %s' % (fileFullPath, fileZipPath))
zf.write(fileFullPath, fileZipPath)
writedFilesMap[fileZipPath] = fileFullPath
print 'write .... %s' % fileZipPath
# 使用整数时间戳
os.stat_float_times(False)
file_mtime = os.path.getmtime(fileFullPath)
touchFilesMap[fileZipPath] = fileFullPath
# 所有文件都写入这里,无例外
manifestCsv.writerow({
'File': fileZipPath,
'MD5': fileMD5,
'Size' : os.path.getsize(fileFullPath),
'MTime' : file_mtime,
'MFormatTime' : datetime.datetime.fromtimestamp(os.path.getmtime(fileFullPath))
})
zf.writestr(MANIFEST_FILE_NAME, manifestStream.getvalue())
# 判断有无文件删除
if compareFilesMap:
for filepath in compareFilesMap.keys():
if not filepath in touchFilesMap.keys():
print("[Deleted]Found a deleted file `%s`" % (filepath))
deleted_files_map[filepath] = compareFilesMap[filepath]
# 写入被删除的文件的列表
if len(deleted_files_map) > 0:
deletedStream= StringIO.StringIO()
for k in deleted_files_map.keys():
deletedStream.write('%s\n'%k)
zf.writestr('.deleted', deletedStream.getvalue())
#print manifestStream.getvalue()
print("Total Writed Files Count: %d" % len(writedFilesMap))
has_changed = len(writedFilesMap) > 0 or len(deleted_files_map) > 0
# 验证文件
verify_package(packagePath, writedFilesMap, deleted_files_map)
manifestStream.close()
# 处理有没有文件被删除的情况
# 如果没任何操作就直接退出,不写入版本,并且删除临时创建的zip
if not has_changed and not force_create:
# 没写入任何文件,删掉生成的zip直接退出吧
print("[WARN]No Files Changed, no need to Zip Resource : %s "% packagePath)
os.remove(packagePath)
return False
return True
def mkdir_p(path):
"""
same as command `mkdir -p`
"""
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def get_current_resource_version(versionFilePath):
"""
获取当前Resource Version返回 int
"""
strVersion = ''
with open(versionFilePath, 'rb+') as f:
strVersion = f.read()
return int(strVersion)
def create_package_diff(args, autoClean = True):
"""
创建指定新版本的新差异资源包
autoClean 是否创建完毕后,清理旧的差异包
"""
args.resVersion = args.newResVersion - 1
# 寻找上一版zip包, 总结跟现在的不同
i = args.resVersion + 1
bResult = True # 标记是否有任意失败
force_create_diff = False
while True:
i = i - 1
if i < 0:
break
_zipName = '%s.%d.zip' % ( args.package_name, i) # 用来对比差异的zip包
_zipPath = '%s/%s' % (args.artifact_dir, _zipName)
print ''
print '====>'
print 'ANANALYSE ==============> %s' % _zipName
compareFilesMap = {} # 文件历史记录在这
if not os.path.isfile(_zipPath):
print("[ERROR]Cannot find package: %s" % _zipName)
sys.exit(1)
# 读取上一版的manifeset
with zipfile.ZipFile(_zipPath, 'r') as rzip:
rManifestStream = rzip.open(MANIFEST_FILE_NAME)
rManifestReader = csv.DictReader(rManifestStream, delimiter=' ', fieldnames=MANIFEST_HEADERS)
for row in rManifestReader:
# 忽略第一行header
if rManifestReader.line_num == 1:
continue
compareFilesMap[row['File']] = row # 读取到所有记录
# 写入新版
newPackageName = '%s.%d-%d.zip' % (args.package_name, i, args.newResVersion)
newPackagePath = '%s/%s' % (args.artifact_dir, newPackageName)
print 'NEW PACK ==============> %s' % newPackageName
if create_package(args, newPackagePath, args.paths, compareFilesMap, force_create_diff):
# 当最后一个差异包生成成功后,要确保其它差异包强制生成!
if (i+1) == args.newResVersion: #
force_create_diff = True
pass
else:
# 任意一个没有构建成功直接退出吧
bResult = False
break
return bResult
def clean_old_packages(args):
"""
清理指定旧版本的所有资源包
"""
args.resVersion = get_current_resource_version(args.version_file)
cleanVersion = args.resVersion - 1 # 因为未知上一阶段是否生成生成,所以起始位上上个版本
for _oldVersion in range(1, cleanVersion + 1):
# 清理资源版本的顶头包,比如现在最新资源版本7, 那么0-6, 1-6等,已经用不上了
for i in range(0, _oldVersion):
oldPackageName = '%s.%d-%d.zip' % (args.package_name, i, _oldVersion)
oldPackagePath = '%s/%s' % (args.artifact_dir, oldPackageName)
archiveDir = '%s/archives' % args.artifact_dir
mkdir_p(archiveDir)
moveToPath = '%s/%s' % (archiveDir, oldPackageName)
if os.path.isfile(oldPackagePath):
shutil.move(oldPackagePath, moveToPath)
print('Clean/Archived `%s`' % oldPackageName)
def test_assert_nofile(zip_path, filepath):
"""
断点一个zip包里 不! 包含一个文件
"""
with zipfile.ZipFile(zip_path, 'r') as rzip:
try:
rzip.open(filepath)
except:
return # 不存在,则成功
assert(False)
def test_assert_file(zip_path, filepath):
"""
断点一个zip包里包含一个文件
"""
with zipfile.ZipFile(zip_path, 'r') as rzip:
assert(rzip.open(filepath))
def test(args):
"""
自动化测试
"""
print("A test for resources_packer, will generate a test folder under script")
script_path = os.path.dirname(os.path.realpath(__file__))
test_dirpath = os.path.join(script_path, '___test_resources_packer')
test_artifactpath = os.path.join(script_path, '___test_resources_packer_artifacts')
if os.path.exists(test_dirpath):
print('Delete test dir... %s' % test_dirpath)
shutil.rmtree(test_dirpath)
if os.path.exists(test_artifactpath):
print('Delete test dir... %s' % test_artifactpath)
shutil.rmtree(test_artifactpath)
os.makedirs(test_dirpath)
os.makedirs(os.path.join(test_dirpath, 'subdir'))
os.makedirs(test_artifactpath)
args.paths = [test_dirpath]
args.package_name = 'test_package'
args.artifact_dir = test_artifactpath
# create test file
with open(os.path.join(test_dirpath, 'a.file'), 'wb+') as wf:
wf.write('ok, this is a file')
args.action = 'init'
init_args(args)
do_init_or_check(args)
test_assert_file(os.path.join(test_artifactpath, 'test_package.0.zip'), '___test_resources_packer/a.file')
args.action = 'check'
init_args(args)
do_init_or_check(args)
# 1号包
with open(os.path.join(test_dirpath, 'a.file'), 'wb+') as wf:
wf.write('ok, this is a file but modified')
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-1.zip')))
test_assert_file(os.path.join(test_artifactpath, 'test_package.0-1.zip'), '___test_resources_packer/a.file')
# 无差异,不会生成资源包
args.action = 'pack'
init_args(args)
do_pack(args)
assert(not os.path.exists(os.path.join(test_artifactpath, 'test_package.0-.zip')))
# 2号包,回滚a文件内容, 测试跨版本,回滚同样的内容生成出包
with open(os.path.join(test_dirpath, 'a.file'), 'wb+') as wf:
wf.write('ok, this is a file')
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-2.zip')))
test_assert_nofile(os.path.join(test_artifactpath, 'test_package.0-2.zip'), '___test_resources_packer/a.file')
# 3号包
with open(os.path.join(test_dirpath, 'subdir', 'b.file'), 'wb+') as wf:
wf.write('ok, this is b file')
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-3.zip')))
test_assert_file(os.path.join(test_artifactpath, 'test_package.0-3.zip'), '___test_resources_packer/subdir/b.file')
# 4号包,回到了0号包的同样状态
os.remove(os.path.join(test_dirpath, 'subdir', 'b.file'))
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-4.zip')))
test_assert_nofile(os.path.join(test_artifactpath, 'test_package.0-4.zip'), '___test_resources_packer/subdir/b.file')
# 5号包,多生成写文件 c
with open(os.path.join(test_dirpath, 'subdir', 'c.file'), 'wb+') as wf:
wf.write('ok, this is c file')
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-5.zip')))
test_assert_file(os.path.join(test_artifactpath, 'test_package.0-5.zip'), '___test_resources_packer/subdir/c.file')
# 6号包,删除所有。。。
os.remove(os.path.join(test_dirpath, 'a.file'))
os.remove(os.path.join(test_dirpath, 'subdir', 'c.file'))
init_args(args)
do_pack(args)
assert(os.path.exists(os.path.join(test_artifactpath, 'test_package.0-6.zip')))
test_assert_nofile(os.path.join(test_artifactpath, 'test_package.0-6.zip'), '___test_resources_packer/subdir/b.file')
test_assert_nofile(os.path.join(test_artifactpath, 'test_package.0-6.zip'), '___test_resources_packer/subdir/c.file')
test_assert_nofile(os.path.join(test_artifactpath, 'test_package.0-6.zip'), '___test_resources_packer/a.file')
def do_pack(args):
"""
执行pack操作
"""
bResult = create_package_diff(args)
if bResult:
# 差异包生成完,生成对应版本的完整包
newFullPackageName = '%s.%d.zip' % (args.package_name, args.newResVersion)
newFullPackagePath = '%s/%s' % (args.artifact_dir, newFullPackageName)
create_package(args, newFullPackagePath, args.paths, None)
cleanVersion = args.resVersion
# 修改版本号,并写入
with open(args.version_file, 'wb+') as wf:
wf.write(str(args.newResVersion))
print('')
print('New Resource Version --> %s' % args.newResVersion)
# TODO: 暂时不清理,永久保留历史
# clean_old_packages(args)
def do_init_or_check(args):
# 直接生成完整包
# init相当于强制创建
if not os.path.isfile(args.get_main_package_path() ) or args.action == 'init': # 不存在,或init时强制重新创建版本完整包
create_package(args, args.get_main_package_path() , args.paths)
if args.resVersion != 0:
create_package_diff(args, False) # 重复生成差异包,这时候已经没有差异了,强制生成空的
print("[WARN]ReBuild Main Package because it has existed... %s" % args.get_main_package_path() )
else: # check and exist
# 存在的,不允许重新创建
print('[WARN]no need main package `%s`' % args.get_main_package_path() )
def init_args(args):
"""
初始化args,放入参数
"""
args.version_file = '%s/%s.resource_version.txt' % (args.artifact_dir, args.package_name)
if not os.path.isfile(args.version_file):
# 不存在则创建默认Versionfile版本号0
mkdir_p(args.artifact_dir)
with open(args.version_file, 'wb+') as wf:
wf.write('0')
print "Create Version file: %s" % args.version_file
strVersion = get_current_resource_version(args.version_file)
# 资源版本号在第4位
args.resVersion = int(strVersion)
args.newResVersion = args.resVersion + 1
print("Current Resource Version: %d" % args.resVersion)
print("Action: %s" % args.action)
if args.ignores_ext:
print('Ignores Ext: `%s`' % ','.join(args.ignores_ext))
if args.blacklist:
print('Blacklist: `%s`' % ','.join(args.blacklist))
args.get_main_package_name = lambda: '%s.%d.zip' % (args.package_name, args.resVersion)
args.get_main_package_path = lambda: '%s/%s' % (args.artifact_dir, args.get_main_package_name())
def main():
parser = argparse.ArgumentParser(description=u'资源包、差异包生成工具, 传入指定的文件目录、输出名字,生成zip')
parser.add_argument('-paths', required=True, nargs='+', help=u'路径,目录或文件,将作为zip的根路径')
parser.add_argument('-package-name', required=True, help=u'包名字前缀')
# parser.add_argument('-version-file-name', default='ResourceVersion.txt', help=u'包含一个数字的txt文件名,相对artifact-dir路径,默认ResourceVersion.txt')
parser.add_argument('-artifact-dir', required=True, help=u'最终成品目录?将搜索这些目录,找出过往的日志')
parser.add_argument('-action', choices=['init', 'check', 'pack', 'clean', 'test'], required=True, help=u'check只确保初始包存在,init则强制重新生成初始化包(不提版), pack则生成差异包(递进版本)')
parser.add_argument('-ignores-ext', nargs='*', help=u'彻底忽略的扩展名,如: .meta .gitignore')
parser.add_argument('-blacklist', nargs='*', help=u'类似ignores-ext, 激活黑名单模式,路径包含这些字符串才会被忽略')
args = parser.parse_args()
init_args(args)
if args.action == 'test':
test(args)
elif args.action == 'init' or args.action == 'check':
do_init_or_check(args)
elif args.action == 'clean':
clean_old_packages(args) # 清理旧版本
else:
do_pack(args)
if __name__ == '__main__':
main()

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/mrkelly-resources_packer.git
git@api.gitlife.ru:oschina-mirror/mrkelly-resources_packer.git
oschina-mirror
mrkelly-resources_packer
mrkelly-resources_packer
master