import asyncio import logging import openai from fastapi import FastAPI, Request from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse from openai import OpenAI from starlette.middleware.cors import CORSMiddleware from DBTools import DBUserInfo, DBCustomUser from LocalModel import CustomLogin, SaveUser, QueryUser, DeleteUser from logic import * API_KEY = "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" app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) logging.basicConfig( level=logging.INFO, # 设置日志级别 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # 日志格式 datefmt='%Y-%m-%d %H:%M:%S', # 时间格式 filename='app.log', # 日志文件存储位置 filemode='a' # 文件模式,'a'为追加模式,默认为'a',还可以选择'w'覆写模式 ) async def ai_stream(content: str): client = openai.OpenAI(api_key=API_KEY) completion = client.chat.completions.create( model="gpt-3.5-turbo", stream=True, messages=[ {"role": "system", "content": content} ] ) try: for chunk in completion: if chunk.choices[0].delta.content: yield chunk.choices[0].delta.content + "\n" await asyncio.sleep(0.01) # 稍微暂停以允许其他任务执行 except Exception as e: yield f"Error: {e}\n" def ai_normal(content: str): client = OpenAI(api_key=API_KEY) completion = client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": content}, ] ) return {"msg": completion.choices[0].message.content} # @app.post("/ai/") # async def do_ai(question: Question): # if question.stream: # return StreamingResponse(ai_stream(question.content), media_type="text/event-stream") # else: # return ai_normal(question.content) # def get_value(s: str): # return s # class MyRequest(BaseModel): # content: str # def test_func(arg1: str): # print(arg1) # return "nice" # @app.post("/func/") # async def call_func(mq: MyRequest): # client = OpenAI(api_key=API_KEY) # messages = [] # messages.append({"role": "system", # "content": "You are a helpful assistant"}) # messages.append({"role": "system", # "content": "If you need to call a function but you do not have enough parameters, ask the user to provide you with the missing parameters."}) # messages.append({"role": "system", # "content": "You must answer in Chinese"}) # messages.append({"role": "user", "content": mq.content}) # # tools = [{ # "type": "function", # "function": { # "name": "get_user_birthday", # "description": "get user birthday", # "parameters": { # "type": "object", # "properties": { # "birthday": { # "type": "string", # "description": "user birthday" # }, # "city": { # "type": "string", # "description": "city of user born" # } # }, # "required": ["birthday", "city"], # } # } # }] # # completion1 = client.chat.completions.create( # model="gpt-4", # messages=messages, # tools=tools # ) # ast1 = completion1.choices[0].message # return {"msg": ast1} # # class YearInfo(BaseModel): # year: int # month: int # day: int # hour: int # minute: int # # @app.post("/wnl/add/") # async def add_wnl(info: YearInfo): # # result = [] # # for year in range(info.from_year, info.to_year+1): # # for month in range(1, 13): # # max_day = 30 # # if month == 2: # # if year % 4 == 0: # # max_day = 29 # # else: # # max_day = 28 # # elif month in (1, 3, 5, 7, 8, 10, 12): # # max_day = 31 # # for day in range(1, max_day + 1): # # result.append({ # # "nian": year, # # "yue": month, # # "ri": day # # }) # # Wannianli.insert_many(result).execute() # wnl = Wannianli.select() # ct = len(wnl) # return {"data": "新增了" + str(ct) + "条数据"} # # # @app.post("/wnl/update/") # async def update_wnl(info: YearInfo): # data = get_wannianli_data(info.year, info.month, info.day) # msg = [] # if data is not None: # msg = [data.nian_gan, data.nian_zhi, # data.yue_gan, data.yue_zhi, # data.ri_gan, data.ri_zhi] # hour_data = get_hour_of_day(data.ri_gan, info.hour) # msg.append(hour_data[0]) # msg.append(hour_data[1]) # return {"date": str(info.year) + "-" + str(info.month) + "-" + str(info.day) + " " + str(info.hour) + ":" + str( # info.minute), # "msg": msg} @app.post("/api/getSiZhuInfo") async def getSiZhuInfo(request: SiZhuInfoRequest): startDtm = None if request.mode == 2: startDtm = calc_date_of_sizhu(request) bazi = BaZi(request) dc = DataCenter(bazi) if startDtm is not None: bazi.taiyangshi = startDtm.__str__() fill_sizhu_in_bazi(bazi, dc) # logging.info("this is a info") # logging.info(jsonable_encoder(bazi)) # print(jsonable_encoder(bazi)) return jsonable_encoder(bazi) @app.post("/api/customLogin") async def customLogin(request: CustomLogin): logging.info("login") dt = DBCustomUser.query_first_by(user=request.user, psd=request.psd) if dt is not None: return {"msg": "ok", "name": dt.name, "sexy": dt.sexy} else: return {"msg": "error", "name": None, "sexy": None} @app.post("/api/saveUser") async def saveUser(request: SaveUser): dts = DBUserInfo.query_by(customer=request.customer) if len(dts) >= 100: return {"msg": "超过可以保存的用户上限,请联系管理员", "state": -1} from datetime import datetime tm = datetime.now() DBUserInfo.insert(name=request.name, beizhu=request.beizhu, man=request.isMan, leibie=request.leibie, year=request.year, month=request.month, day=request.day, hour=request.hour, minute=request.minute, sheng=request.sheng, shi=request.shi, qu=request.qu, niangan=request.niangan, nianzhi=request.nianzhi, yuegan=request.yuegan, yuezhi=request.yuezhi, rigan=request.rigan, rizhi=request.rizhi, shigan=request.shigan, shizhi=request.shizhi, customer=request.customer, joinTime=tm.strftime("%Y-%m-%d %H:%M:%S"), enabled=1, ) return {"msg": "保存用户信息成功", "state": 200} def __build_user_object1(dt: DBUserInfo): return { "id": dt.id, "name": dt.name, "beizhu": dt.beizhu, "isMan": bool(dt.man), "leibie": dt.leibie, "year": dt.year, "month": dt.month, "day": dt.day, "hour": dt.hour, "minute": dt.minute, "sheng": dt.sheng, "shi": dt.shi, "qu": dt.qu, "niangan": dt.niangan, "nianzhi": dt.nianzhi, "yuegan": dt.yuegan, "yuezhi": dt.yuezhi, "rigan": dt.rigan, "rizhi": dt.rizhi, "shigan": dt.shigan, "shizhi": dt.shizhi, "customer": dt.customer, "joinTime": dt.joinTime } def __do_query_user(customer: str, filter: str): dts = DBUserInfo.query_by(customer=customer, enabled=1) data = [] if len(dts) > 0: for dt in dts: if filter is None: data.append(__build_user_object1(dt)) else: if filter in dt.name: data.append(__build_user_object1(dt)) return data @app.post("/api/queryUser") async def queryUser(request: QueryUser): data = __do_query_user(request.customer, request.filter) return jsonable_encoder(data) @app.post("/api/deleteUser") async def deleteUser(request: DeleteUser): ss = DBUserInfo.new_session() data = ss.query(DBUserInfo).filter_by(id=request.id).first() if data is not None: data.enabled = 0 ss.commit() ss.close() return __do_query_user(request.customer, None) @app.post("/api/test") async def test(request: Request): # ContentLogic.createContent() return JSONResponse(content="ok") @app.get("/api/test2") async def test2(): return {"message": "Hello World"}