2023-04-03 11:04:31 +08:00

236 lines
7.9 KiB
Python

# -*- coding: UTF-8 -*-
import os
import total
import table
def wrap_error(hint):
def wrap(f):
def wrap_func(*args, **argv):
try:
return f(*args, **argv)
except Exception as e:
raise Exception('%s : %s' % (hint, e))
return wrap_func
return wrap
def get_all_item(data, t, t_m, st):
if not data:
return []
ret = []
if isinstance(t, list):
for sub_data in data:
ret += get_all_item(sub_data, t[0], t_m, st)
elif t in t_m:
if t == st:
ret = [data, ]
else:
for (x, y) in t_m[t]:
ret += get_all_item(data[x], y, t_m, st)
return ret
@wrap_error('get_data')
def get_data(row_data, path, kts, type_mapping, filter_path=None, filter_op=None, filter_value=None):
if len(path) == 1:
if path[0] not in row_data:
return []
try:
data = row_data.get(path[0])
except Exception:
print row_data
print path
raise
if not isinstance(data, list):
data = [data]
if filter_path:
return [item for item in data if filter_op(item, filter_value)]
else:
return data
# len(path) == 2
a, b = path[0], path[1]
sub_items = []
for kt in kts:
if kt:
sub_items += get_all_item(row_data.get(kt.name), kt.type, type_mapping, a)
if not filter_path:
return map(lambda d: d[b], sub_items)
if len(filter_path) == 1:
filter_key = filter_path[0]
elif len(filter_path) == 2:
filter_key = filter_path[1]
else:
raise Exception(u"rule 无法过滤: %s" % filter_path)
return map(lambda d: d[b], [item for item in sub_items if filter_op(item.get(filter_key), filter_value)])
@wrap_error('check_path')
def check_path(kts, type_mapping, path, filter_path):
ks = [kt.name for kt in kts if kt]
if len(path) == 1:
if path[0] not in ks:
raise Exception("rule 字段路径错误: %s" % path)
if filter_path and len(filter_path) > 1:
raise Exception("rule 无法过滤: %s" % filter_path)
elif len(path) == 2:
a, b = path[0], path[1]
if a not in type_mapping:
if a in ks:
path[0] = [kt.type for kt in kts if kt and kt.name == a][0]
if isinstance(path[0], list):
path[0] = path[0][0]
if filter_path and len(filter_path) == 2:
filter_path[0] = path[0]
a = path[0]
else:
raise Exception(u"rule 字段路径错误:%s" % path)
sub_types = [s[0] for s in type_mapping[a]]
if b not in sub_types:
raise Exception(u"rule 字段路径错误:%s" % path)
if not filter_path:
return
if len(filter_path) == 1:
if filter_path[0] not in sub_types:
raise Exception(u"rule 无法过滤: %s" % filter_path)
elif len(filter_path) == 2:
c, d = filter_path[0], filter_path[1]
if c != a or d not in sub_types:
raise Exception(u"rule 无法过滤: %s" % filter_path)
else:
raise Exception(u"rule 无法过滤: %s" % filter_path)
else:
raise Exception(u"rule 字段路径错误: %s" % path)
@wrap_error('get_filter')
def get_filter(pattern):
key, value, op_index = None, None, 0
for op in ['==', '!=', '>=', '<=', '>', '<']:
if op in pattern:
ss = pattern.split(op)
key, value, op_index = ss[0], ss[1], op
break
if key is None:
raise Exception(u'rule 过滤格式错误 %s' % pattern)
key = key.split('.')
op_map = {
'==': lambda x, y: x == y,
'!=': lambda x, y: x != y,
'>=': lambda x, y: x >= y,
'<=': lambda x, y: x <= y,
'>': lambda x, y: x > y,
'<': lambda x, y: x < y,
}
if value.isdigit():
value = int(value)
else:
try:
value = float(value)
except ValueError:
pass
return key, op_map[op_index], value
def check_reference(excel_obj, rule, error_msg_list):
path = rule[1].split('.')
filter_path = None
filter_op = None
filter_value = None
if len(rule) == 5:
filter_path, filter_op, filter_value = get_filter(rule[4])
check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path)
reference_file_path = os.path.join(os.path.dirname(excel_obj["file_path"]), rule[2])
reference_excel_obj = total.get_excel_obj(reference_file_path)
if reference_excel_obj is not None:
reference_key_set = reference_excel_obj['key_set']
for row in excel_obj['data']:
key_data_list = get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op,
filter_value)
for key in key_data_list:
if key not in reference_key_set:
error_msg = excel_obj["file_name"] + u"表的" + str(row) + u"引用" + rule[2] + u"表的" + rule[3] + \
":" + str(key) + u"出错"
error_msg_list.append(error_msg)
else:
error_msg = u"找不到" + excel_obj["file_path"] + u"表的" + rule[1] + u"所引用的" + reference_file_path + u""
error_msg_list.append(error_msg)
def check_row_sum(excel_obj, rule, error_msg_list):
sum_row = rule[1]
path = rule[2].split('.')
filter_path = None
filter_op = None
filter_value = None
if len(rule) == 4:
filter_path, filter_op, filter_value = get_filter(rule[3])
check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path)
for i in range(len(excel_obj['data'])):
row = excel_obj['data'][i]
data_sum = sum(get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op,
filter_value))
if abs(data_sum - sum_row) > 0.000001:
error_msg = excel_obj["file_name"] + u"表,第" + str(row) + u"行的和应为: " + str(sum_row) + u" 实为: " + str(
data_sum)
error_msg_list.append(error_msg)
def check_col_sum(excel_obj, rule, error_msg_list):
sum_col = rule[1]
path = rule[2].split('.')
filter_path = None
filter_op = None
filter_value = None
if len(rule) == 4:
filter_path, filter_op, filter_value = get_filter(rule[3])
check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path)
data_sum = 0.0
for row in range(excel_obj['data']):
data = sum(get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op,
filter_value))
data_sum += data
if abs(sum_col - data_sum) > 0.000001:
error_msg = excel_obj["file_name"] + u"表,列的和应为: " + str(sum_col) + u" 实为: " + str(data_sum)
error_msg_list.append(error_msg)
def check(excel_obj):
error_msg_list = []
rules = table.get_rules(excel_obj)
if len(rules) == 0:
return None
try:
for i in range(len(rules)):
if rules[i][0] == "reference":
check_reference(excel_obj, rules[i], error_msg_list)
elif rules[i][0] == "sum_row":
check_row_sum(excel_obj, rules[i], error_msg_list)
elif rules[i][0] == "sum_col":
check_col_sum(excel_obj, rules[i], error_msg_list)
except Exception as e:
# import traceback, sys
# exc_info = sys.exc_info()
# traceback.print_exception(*exc_info)
return ['%s : %s' % (excel_obj['file_path'], e)]
if len(error_msg_list) > 0:
return ['%s : %s' % (excel_obj['file_path'], error) for error in error_msg_list]
else:
return None
if __name__ == '__main__':
print get_filter('heroCost.id!=0')