import base64 import datetime import json import time import traceback from django.http import JsonResponse from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView import tools from wx import logic from wx.dbTools import DB_WX_GZH_User, DBYiXueUser, DBSysUser from wx.parsers import GZHParser, JsonParser from wx.wxModels import WXUserMsg, WXMenu, UserRequest, CalcNongLiRequest, GetSiZhuRequest, GetSiZhuByNongLiRequest, \ GetSiZhuByTextsRequest, SaveUserRequest, LoadUserRequest, DeleteUserRequest, LoginRequest from yixue import Models, DataCenter from yixue.Models import BaseInfo def is_empty(s: str): return s is None or len(s) == 0 # Create your views here. class GetAccessToken(APIView): def get(self, request): return Response(tools.__get_access_token__()) class UpdateMenu(APIView): def post(self, request: Request): menu = WXMenu() menu.add_click_button("jinriyunshi", "jin_ri_yun_shi") tk = tools.__get_access_token__() print(tk) url = " https://api.weixin.qq.com/cgi-bin/menu/create?access_token=" + tk print(url) body = json.dumps(menu.obj, ensure_ascii=False) print(body) rsp = tools.sync_post(url, menu.obj) print(rsp.text) return Response(rsp.text) class RemoveMenu(APIView): pass # def user_view(request: Request): # return render(request, "wx/user_view.html") class UserMsg(APIView): parser_classes = [GZHParser] def post(self, request: Request): obj = json.loads(request.data) bean = WXUserMsg() for key in obj: if len(key) > 0: value = obj[key] setattr(bean, key, value) return Response(obj) class UrlCheck(APIView): parser_classes = [GZHParser] def get(self, request: Request): sign = request.GET.get("signature") echo_str = request.GET.get("echostr") timestamp = request.GET.get("timestamp") nonce = request.GET.get("nonce") token = "deanlai" # list = [token, timestamp, nonce] # list.sort() # sha1 = hashlib.sha1() # map(sha1.update, list) # hashcode = sha1.hexdigest() return Response(int(echo_str)) def post(self, request: Request): obj = tools.xml_parser(request.data) bean = WXUserMsg.build_from(obj) content = "" try: if bean.Content == "今日运势": content = logic.get_lucky_day(bean.FromUserName) except Exception as e: print(e) print(bean) reply = WXUserMsg() reply.ToUserName = bean.FromUserName reply.FromUserName = bean.ToUserName reply.MsgType = bean.MsgType reply.CreateTime = int(time.time()) reply.Content = content xml_data = tools.xml_build("xml", reply) xml_str = tools.xml_to_string(xml_data) print(xml_str) return Response(xml_str) class ApplyUserInfo(APIView): def post(self, request: Request): bean = UserRequest.build_from(request.data) msg = "您的信息已记录,请退出页面后再次点击【今日运势】" code = 0 if is_empty(bean.open_id): code = -100 msg = "未获取到用户信息,请退出页面重试或联系客服" elif is_empty(bean.user_name): code = -1 msg = "用户姓名未填写" else: if DB_WX_GZH_User.check_exist( open_id=bean.open_id ): code = -100 msg = "用户信息已存在,如需修改请联系客服" else: DB_WX_GZH_User.insert( open_id=bean.open_id, user_name=bean.user_name, is_man=bean.is_man, date_mode=bean.date_mode, born_year=bean.born_year, born_month=bean.born_month, born_day=bean.born_day, born_hour=bean.born_hour, born_minute=bean.born_minute, know_time=bean.know_time, born_sheng=bean.born_sheng, born_shi=bean.born_shi, born_qu=bean.born_qu, ) return Response({"code": code, "msg": msg}) def jia_mi(s: str): return base64.b64encode(s.encode("utf-8")).decode("utf-8") def jie_mi(s: str): return base64.b64decode(s).decode("utf-8") class LoginSystem(APIView): parser_classes = [JsonParser] def post(self, request): dt = json.loads(request.data) # print(dt) req = LoginRequest.build_from(dt) finger = req.userAgent + req.colorDepth + req.timezoneOffset + req.language user = DBSysUser.query_first_by(account=req.account, password=req.password) code = 200 rsp = {} # 登录流程 # 1、账号密码比对 # 1-1、账号密码错误,直接返回 # 1-2、账号密码正确,取得当前设备指纹,换算成cur_token # 1-2-1、数据库未记录设备指纹,更新设备指纹到数据库,在内存存入cur_token # 1-2-2、数据库已记录设备指纹,提取数据库存的设备指纹,换算成db_token,对比db_token和cur_token # 1-2-2-1、db_token 和 cur_token 不一致,表示更换了设备登录,此时要修改内存中的mem_token,确保旧的账号访问API失败 # 1-2-2-2、db_token 和 cur_token 一致,表示同一设备登录 if user is not None: cur_token = jia_mi(finger) if user.finger is None: # 第一次,记录设备信息 DBSysUser.update(conditions=(DBSysUser.id == user.id), values={"finger": finger}) DataCenter.set_token(user.id, cur_token) else: # 已记录登录的设备,则提取指纹,换算成db_token db_finger = user.finger db_token = jia_mi(db_finger) if db_token != cur_token: # 用户更换了登录设备,此时需要更新指纹,并记录更换设备的信息 if user.allow_change_dev is True: ct = user.dev_change ct += 1 DBSysUser.update(conditions=(DBSysUser.id == user.id), values={"finger": finger, "dev_change": ct}) print("用户:", user.name, " 更换了登录设备,更换次数:", user.dev_change) DataCenter.set_token(user.id, cur_token) else: return JsonResponse({"code": -100, "data": {}}, json_dumps_params={'ensure_ascii': False}) else: DataCenter.set_token(user.id, cur_token) rsp = { "name": user.name, "token": cur_token, "uid": user.id } else: # 账号密码错误 code = -1 return JsonResponse({"code": code, "data": rsp}, json_dumps_params={'ensure_ascii': False}) class GetSiZhu(APIView): parser_classes = [JsonParser] def post(self, request: Request): ''' request:{ token:str isMan:bool year:int 年 month:int 月 day:int 日 hourMode:int 2=未知 hour:int 时 minute:int 分 areaMode:int sheng:str 省 shi:str 市 qu:str 区 } 只接受新历(阳历) ''' rsp = {} tm1 = datetime.datetime.now() print("receive request", tm1) try: dt = json.loads(request.data) # print(dt) req = GetSiZhuRequest.build_from(dt) if req.token is None: return JsonResponse({"code": -100, "data": {}}) else: # 对比token是否发生了改变 mem_token = DataCenter.get_token(req.uid) if mem_token is None: DataCenter.set_token(req.uid, req.token) elif mem_token != req.token: return JsonResponse({"code": -200, "data": {}}) req.fix_data() result = logic.calc_si_zhu(req.year, req.month, req.day, req.hour, req.minute, req.sheng, req.shi, req.qu) tm13 = datetime.datetime.now() print("calc sizhu use:", tm13 - tm1) if result is not None: bazi = logic.build_bazi(result, req.isMan) bazi.base_info = BaseInfo() bazi.base_info.date_mode = 0 bazi.base_info.year = req.year bazi.base_info.month = req.month bazi.base_info.day = req.day bazi.base_info.hour_mode = req.hourMode bazi.base_info.hour = req.hour bazi.base_info.minute = req.minute bazi.base_info.area_mode = req.areaMode bazi.base_info.sheng = req.sheng bazi.base_info.shi = req.shi bazi.base_info.qu = req.qu if req.hourMode == 2: bazi.tai_yang_shi = None rsp = bazi.to_response() except Exception as e: print(e) traceback.print_exc() return JsonResponse({"code": 200, "data": rsp}) class GetSiZhuByNongLi(APIView): parser_classes = [JsonParser] def post(self, request: Request): ''' request:{ uid:int token:str isMan:bool year:int 年 month:string 月 注意是字符串 day:string 日 注意是字符串 hourMode:int 2=未知 hour:int 时 minute:int 分 areaMode:int sheng:str 省 shi:str 市 qu:str 区 } 只接受农历(阴历) ''' rsp = {} try: dt = json.loads(request.data) req = GetSiZhuByNongLiRequest.build_from(dt) if req.token is None: return JsonResponse({"code": -100, "data": {}}) else: # 对比token是否发生了改变 mem_token = DataCenter.get_token(req.uid) if mem_token is None: DataCenter.set_token(req.uid, req.token) elif mem_token != req.token: return JsonResponse({"code": -200, "data": {}}) req.fix_data() xinli = logic.calc_nong_li(req.year, req.month, req.day) print("对应阳历:", xinli) result = logic.calc_si_zhu(req.year, xinli.month, xinli.day, req.hour, req.minute, req.sheng, req.shi, req.qu) if result is not None: bazi = logic.build_bazi(result, req.isMan) bazi.base_info = BaseInfo() bazi.base_info.date_mode = 1 bazi.base_info.nongli_year = req.year bazi.base_info.nongli_month = req.month bazi.base_info.nongli_day = req.day bazi.base_info.hour_mode = req.hourMode bazi.base_info.hour = req.hour bazi.base_info.minute = req.minute bazi.base_info.area_mode = req.areaMode bazi.base_info.sheng = req.sheng bazi.base_info.shi = req.shi bazi.base_info.qu = req.qu if req.hourMode == 2: bazi.tai_yang_shi = None rsp = bazi.to_response() except Exception as e: print(e) traceback.print_exc() return JsonResponse({"code": 200, "data": rsp}) class GetSiZhuByTexts(APIView): parser_classes = [JsonParser] def post(self, request: Request): ''' request:{ uid:int token:str isMan:bool niangan:str nianzhi:str yuegan:str yuezhi:str rigan:str rizhi:str shigan:str shizhi:str hourMode:int 1=未知 } ''' rsp = {} code = 200 try: dt = json.loads(request.data) req = GetSiZhuByTextsRequest.build_from(dt) if req.token is None: return JsonResponse({"code": -100, "data": {}}) else: # 对比token是否发生了改变 mem_token = DataCenter.get_token(req.uid) if mem_token is None: DataCenter.set_token(req.uid, req.token) elif mem_token != req.token: return JsonResponse({"code": -200, "data": {}}) sizhu_checker = logic.check_date_by_texts(req.niangan, req.nianzhi, req.yuegan, req.yuezhi, req.rigan, req.rizhi, req.shigan, req.shizhi) # print(sizhu_checker) result = Models.SiZhuResult() result.nian_gan = req.niangan result.nian_zhi = req.nianzhi result.yue_gan = req.yuegan result.yue_zhi = req.yuezhi result.ri_gan = req.rigan result.ri_zhi = req.rizhi result.shi_gan = req.shigan result.shi_zhi = req.shizhi result.sheng = "未知地区" result.shi = "-" result.qu = "-" result.tai_yang_shi = None if sizhu_checker.error: code = -1 rsp = sizhu_checker.to_response() if req.ignoreError: pass else: pass else: if req.hourMode == 0: # print(sizhu_checker.adv_date[0]) result.tai_yang_shi = datetime.datetime.strptime(sizhu_checker.adv_date[0], "%Y-%m-%d %H:%M") else: result.tai_yang_shi = datetime.datetime.strptime(sizhu_checker.adv_date[0] + " 0:00", "%Y-%m-%d %H:%M") result.shi_gan = "无" result.shi_zhi = "无" bazi = logic.build_bazi(result, req.isMan) bazi.base_info = BaseInfo() bazi.base_info.date_mode = 2 bazi.base_info.nian_gan = req.niangan bazi.base_info.nian_zhi = req.nianzhi bazi.base_info.yue_gan = req.yuegan bazi.base_info.yue_zhi = req.yuezhi bazi.base_info.ri_gan = req.rigan bazi.base_info.ri_zhi = req.rizhi bazi.base_info.shi_gan = req.shigan bazi.base_info.shi_zhi = req.shizhi if req.hourMode == 1: bazi.tai_yang_shi = None rsp = bazi.to_response() except Exception as e: print(e) traceback.print_exc() return Response({"code": code, "data": rsp}) class CalcNongLi(APIView): parser_classes = [JsonParser] def post(self, request: Request): ''' request:{ year:int month:str 农历月 day:str 农历日 } ''' result = None try: dt = json.loads(request.data) req = CalcNongLiRequest.build_from(dt) result = logic.calc_nong_li(req.year, req.month, req.day) except Exception as e: print(e) traceback.print_exc() return Response({"code": 200, "data": result}) class SaveUser(APIView): parser_classes = [JsonParser] def post(self, request: Request): rsp = [] try: dt = json.loads(request.data) req = SaveUserRequest.build_from(dt) DBYiXueUser.insert( name=req.name, isMan=req.isMan, dateMode=req.dateMode, extra=req.extra, owner_id=req.owner_id, year=req.year, month=req.month, day=req.day, hourMode=req.hourMode, hour=req.hour, minute=req.minute, areaMode=req.areaMode, sheng=req.sheng, shi=req.shi, qu=req.qu, nongli_year=req.nongli_year, nongli_month=req.nongli_month, nongli_day=req.nongli_day, req_nian_gan=req.req_nian_gan, req_nian_zhi=req.req_nian_zhi, req_yue_gan=req.req_yue_gan, req_yue_zhi=req.req_yue_zhi, req_ri_gan=req.req_ri_gan, req_ri_zhi=req.req_ri_zhi, req_shi_gan=req.req_shi_gan, req_shi_zhi=req.req_shi_zhi, rsp_nian_gan=req.rsp_nian_gan, rsp_nian_zhi=req.rsp_nian_zhi, rsp_yue_gan=req.rsp_yue_gan, rsp_yue_zhi=req.rsp_yue_zhi, rsp_ri_gan=req.rsp_ri_gan, rsp_ri_zhi=req.rsp_ri_zhi, rsp_shi_gan=req.rsp_shi_gan, rsp_shi_zhi=req.rsp_shi_zhi, ) data = DBYiXueUser.query_by(owner_id=req.owner_id) for dt in data: rsp.append(dt.to_json_object()) except Exception as e: print(e) traceback.print_exc() return Response({"code": 200, "data": rsp}) class LoadUser(APIView): parser_classes = [JsonParser] def post(self, request: Request): rsp = [] try: dt = json.loads(request.data) req = LoadUserRequest.build_from(dt) data = DBYiXueUser.query_by(owner_id=req.owner_id) for dt in data: rsp.append(dt.to_json_object()) except Exception as e: print(e) traceback.print_exc() return Response({"code": 200, "data": rsp}) class DeleteUser(APIView): parser_classes = [JsonParser] def post(self, request: Request): rsp = [] try: dt = json.loads(request.data) req = DeleteUserRequest.build_from(dt) DBYiXueUser.delete_by(id=req.user_id, owner_id=req.owner_id) data = DBYiXueUser.query_by(owner_id=req.owner_id) for dt in data: rsp.append(dt.to_json_object()) except Exception as e: print(e) traceback.print_exc() return Response({"code": 200, "data": rsp})