2023-04-03 11:04:31 +08:00

180 lines
6.1 KiB
Python

#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import util
import multiprocessing
import table
all_table = []
struct_config = dict()
def read_struct_config(get_max_row, get_max_col, access):
success = True
global struct_config
max_row = get_max_row()
max_col = get_max_col()
for i in range(4, max_row):
name = access(i, 0)
if not name:
break
attribs = []
for j in range(1, max_col):
platform = access(1, j)
if platform == "none":
continue
attrib = access(i, j)
if not attrib:
break
kv = attrib.split(':')
k, v = kv[0], kv[1]
attribs.append((k, v))
if struct_config.get(name) != None:
success = False
print(u"[ERROR]struct.xlsx 有同名的自定义结构")
struct_config[name] = attribs
return success
def init_config_struct(folder_path):
success = True
struct_path = os.path.join(folder_path, 'struct.xlsx')
if os.path.exists(struct_path):
from openpyxl import load_workbook
wb = load_workbook(struct_path, data_only=True)
for sheet in wb:
gmr, gmc, access, access_color, get_rows = util.xlsx_get_function(sheet)
success = success & read_struct_config(gmr, gmc, access)
wb.close()
return success
def convert(split_path_list, struct_config, config=None):
convert_table_list = []
error_list = []
for i in range(len(split_path_list)):
try:
table_infos = table.convert(split_path_list[i], struct_config, config)
except BaseException as info:
print (split_path_list[i]["folder_path"], split_path_list[i]['file_path'])
import traceback
exc_info = sys.exc_info()
traceback.print_exception(*exc_info)
st = split_path_list[i]["file_path"][len(split_path_list[i]["folder_path"]):] + u"" + str(info)
error_list.append(st)
return st
if table_infos != None:
for i in range(len(table_infos)):
convert_table_list.append(table_infos[i])
return convert_table_list
def search_convert(folder_path, dump_path, isDev):
path_info_list = []
global all_table
all_table = []
dst_folder_map = {}
if os.path.exists('./config.json'):
config = util.read_json_file('./config.json')
elif os.path.exists(os.path.join(folder_path, 'config.json')):
config = util.read_json_file(os.path.join(folder_path, 'config.json'))
else:
config = None
if not isDev and isinstance(config, dict):
config['lua_suffix'] = config['lua_suffix'] + ".bytes"
for root, dirs, files in os.walk(folder_path, True):
for name in files:
if not util.is_excel_file(name) or util.ignore(folder_path, root, name, config):
continue
forbid = util.forbid(name, config)
if forbid:
print (forbid)
return
ab_path = os.path.join(root, name)
path_info = {}
path_info["folder_path"] = folder_path
path_info["dump_path"] = dump_path
path_info["file_path"] = ab_path
path_info["file_size"] = os.path.getsize(ab_path)
path_info_list.append(path_info)
# 提前创建好目录
if not dst_folder_map.get(root):
dst_folder_path = dump_path + root[len(folder_path):]
if not os.path.exists(dst_folder_path):
os.makedirs(dst_folder_path)
dst_folder_map[root] = 1
if len(path_info_list) > 0:
# 先分分为几组,每组的文件大小的总和相近
path_info_list.sort(key=lambda k:k["file_size"], reverse=False)
cpu_count = multiprocessing.cpu_count()
split_path_list = []
for i in range(cpu_count):
files = {}
files["total_size"] = 0
files["path_list"] = []
split_path_list.append(files)
while len(path_info_list) > 0:
element = path_info_list.pop()
split_path_list[0]["total_size"] += element["file_size"]
split_path_list[0]["path_list"].append(element)
split_path_list.sort(key=lambda k:k["total_size"], reverse=False)
pool = multiprocessing.Pool(processes=cpu_count)
pool_result = []
global struct_config
for i in range(cpu_count):
pool_result.append(pool.apply_async(convert, (split_path_list[i]["path_list"], struct_config, config)))
pool.close()
pool.join()
error_msg_list = []
for i in pool_result:
convert_table_result = i.get()
if isinstance(convert_table_result, list):
for j in range(len(convert_table_result)):
all_table.append(convert_table_result[j])
else:
error_msg_list.append(convert_table_result)
if len(error_msg_list) > 0:
print("********************************")
for msg in error_msg_list:
print(msg)
print("********************************")
return False
else:
return True
else:
return False
def convert_all_excel(folder_path, dump_path, isDev):
success = init_config_struct(folder_path)
success = success & search_convert(folder_path, dump_path, isDev)
if success:
# success = check_all_rule()
if success:
return 0
else:
return 2
else:
return 1
# def check_all_rule():
# if len(all_table) > 0:
# error_msg_list = []
# for i in range(len(all_table)):
# error_msgs = rule.check(all_table[i])
# if error_msgs is not None:
# error_msg_list.append(error_msgs)
# if len(error_msg_list) > 0:
# for i in range(len(error_msg_list)):
# for j in range(len(error_msg_list[i])):
# print(error_msg_list[i][j])
# return False
# else:
# return True
# else:
# return False