# -*- coding: UTF-8 -*- import os import total import table def wrap_error(hint): def wrap(f): def wrap_func(*args, **argv): try: return f(*args, **argv) except Exception as e: raise Exception('%s : %s' % (hint, e)) return wrap_func return wrap def get_all_item(data, t, t_m, st): if not data: return [] ret = [] if isinstance(t, list): for sub_data in data: ret += get_all_item(sub_data, t[0], t_m, st) elif t in t_m: if t == st: ret = [data, ] else: for (x, y) in t_m[t]: ret += get_all_item(data[x], y, t_m, st) return ret @wrap_error('get_data') def get_data(row_data, path, kts, type_mapping, filter_path=None, filter_op=None, filter_value=None): if len(path) == 1: if path[0] not in row_data: return [] try: data = row_data.get(path[0]) except Exception: print row_data print path raise if not isinstance(data, list): data = [data] if filter_path: return [item for item in data if filter_op(item, filter_value)] else: return data # len(path) == 2 a, b = path[0], path[1] sub_items = [] for kt in kts: if kt: sub_items += get_all_item(row_data.get(kt.name), kt.type, type_mapping, a) if not filter_path: return map(lambda d: d[b], sub_items) if len(filter_path) == 1: filter_key = filter_path[0] elif len(filter_path) == 2: filter_key = filter_path[1] else: raise Exception(u"rule 无法过滤: %s" % filter_path) return map(lambda d: d[b], [item for item in sub_items if filter_op(item.get(filter_key), filter_value)]) @wrap_error('check_path') def check_path(kts, type_mapping, path, filter_path): ks = [kt.name for kt in kts if kt] if len(path) == 1: if path[0] not in ks: raise Exception("rule 字段路径错误: %s" % path) if filter_path and len(filter_path) > 1: raise Exception("rule 无法过滤: %s" % filter_path) elif len(path) == 2: a, b = path[0], path[1] if a not in type_mapping: if a in ks: path[0] = [kt.type for kt in kts if kt and kt.name == a][0] if isinstance(path[0], list): path[0] = path[0][0] if filter_path and len(filter_path) == 2: filter_path[0] = path[0] a = path[0] else: raise Exception(u"rule 字段路径错误:%s" % path) sub_types = [s[0] for s in type_mapping[a]] if b not in sub_types: raise Exception(u"rule 字段路径错误:%s" % path) if not filter_path: return if len(filter_path) == 1: if filter_path[0] not in sub_types: raise Exception(u"rule 无法过滤: %s" % filter_path) elif len(filter_path) == 2: c, d = filter_path[0], filter_path[1] if c != a or d not in sub_types: raise Exception(u"rule 无法过滤: %s" % filter_path) else: raise Exception(u"rule 无法过滤: %s" % filter_path) else: raise Exception(u"rule 字段路径错误: %s" % path) @wrap_error('get_filter') def get_filter(pattern): key, value, op_index = None, None, 0 for op in ['==', '!=', '>=', '<=', '>', '<']: if op in pattern: ss = pattern.split(op) key, value, op_index = ss[0], ss[1], op break if key is None: raise Exception(u'rule 过滤格式错误 %s' % pattern) key = key.split('.') op_map = { '==': lambda x, y: x == y, '!=': lambda x, y: x != y, '>=': lambda x, y: x >= y, '<=': lambda x, y: x <= y, '>': lambda x, y: x > y, '<': lambda x, y: x < y, } if value.isdigit(): value = int(value) else: try: value = float(value) except ValueError: pass return key, op_map[op_index], value def check_reference(excel_obj, rule, error_msg_list): path = rule[1].split('.') filter_path = None filter_op = None filter_value = None if len(rule) == 5: filter_path, filter_op, filter_value = get_filter(rule[4]) check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path) reference_file_path = os.path.join(os.path.dirname(excel_obj["file_path"]), rule[2]) reference_excel_obj = total.get_excel_obj(reference_file_path) if reference_excel_obj is not None: reference_key_set = reference_excel_obj['key_set'] for row in excel_obj['data']: key_data_list = get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op, filter_value) for key in key_data_list: if key not in reference_key_set: error_msg = excel_obj["file_name"] + u"表的" + str(row) + u"引用" + rule[2] + u"表的" + rule[3] + \ ":" + str(key) + u"出错" error_msg_list.append(error_msg) else: error_msg = u"找不到" + excel_obj["file_path"] + u"表的" + rule[1] + u"所引用的" + reference_file_path + u"表" error_msg_list.append(error_msg) def check_row_sum(excel_obj, rule, error_msg_list): sum_row = rule[1] path = rule[2].split('.') filter_path = None filter_op = None filter_value = None if len(rule) == 4: filter_path, filter_op, filter_value = get_filter(rule[3]) check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path) for i in range(len(excel_obj['data'])): row = excel_obj['data'][i] data_sum = sum(get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op, filter_value)) if abs(data_sum - sum_row) > 0.000001: error_msg = excel_obj["file_name"] + u"表,第" + str(row) + u"行的和应为: " + str(sum_row) + u" 实为: " + str( data_sum) error_msg_list.append(error_msg) def check_col_sum(excel_obj, rule, error_msg_list): sum_col = rule[1] path = rule[2].split('.') filter_path = None filter_op = None filter_value = None if len(rule) == 4: filter_path, filter_op, filter_value = get_filter(rule[3]) check_path(excel_obj['kts'], excel_obj['type_mapping'], path, filter_path) data_sum = 0.0 for row in range(excel_obj['data']): data = sum(get_data(row, path, excel_obj['kts'], excel_obj['type_mapping'], filter_path, filter_op, filter_value)) data_sum += data if abs(sum_col - data_sum) > 0.000001: error_msg = excel_obj["file_name"] + u"表,列的和应为: " + str(sum_col) + u" 实为: " + str(data_sum) error_msg_list.append(error_msg) def check(excel_obj): error_msg_list = [] rules = table.get_rules(excel_obj) if len(rules) == 0: return None try: for i in range(len(rules)): if rules[i][0] == "reference": check_reference(excel_obj, rules[i], error_msg_list) elif rules[i][0] == "sum_row": check_row_sum(excel_obj, rules[i], error_msg_list) elif rules[i][0] == "sum_col": check_col_sum(excel_obj, rules[i], error_msg_list) except Exception as e: # import traceback, sys # exc_info = sys.exc_info() # traceback.print_exception(*exc_info) return ['%s : %s' % (excel_obj['file_path'], e)] if len(error_msg_list) > 0: return ['%s : %s' % (excel_obj['file_path'], error) for error in error_msg_list] else: return None if __name__ == '__main__': print get_filter('heroCost.id!=0')