我有以下代码,其中我根据我的上下文提出问题,并能够以流格式获得各自的输出.但是,我正在为相同的结果创建一个API,并且无法复制类似的结果

from langchain import OpenAI
from types import FunctionType
from llama_index import ServiceContext, GPTVectorStoreIndex, LLMPredictor, PromptHelper, SimpleDirectoryReader, load_index_from_storage
import sys
import os
import time 
from llama_index.response.schema import StreamingResponse
import uvicorn 
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn 



os.environ["OPENAI_API_KEY"] = "your key here" # gpt 3.5 turbo


app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

from llama_index import StorageContext, load_index_from_storage, ServiceContext
from langchain.chat_models import ChatOpenAI

def construct_index(directory_path):
    max_input_size = 4096
    num_outputs = 5000
    max_chunk_overlap = 256
    chunk_size_limit = 3900
    file_metadata = lambda x : {"filename": x}
    reader = SimpleDirectoryReader(directory_path, file_metadata=file_metadata)
    
    documents = reader.load_data()

    prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)
    llm_predictor = LLMPredictor(llm=OpenAI(temperature=0, model_name="gpt-3.5-turbo", max_tokens=num_outputs))
    
    service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper)

    index = GPTVectorStoreIndex.from_documents(
        documents=documents, service_context = service_context
    )
    
    index.storage_context.persist("./jsons/contentstack_llm")
    return index
   
def get_index():
    max_input_size = 4000
    num_outputs = 1024
    max_chunk_overlap = 512
    chunk_size_limit = 3900
    prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)
    llm_predictor = LLMPredictor(llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo", max_tokens=num_outputs, streaming = True))
    
    service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper)
    
    return service_context  

# construct_index("./docs")
storage_context = StorageContext.from_defaults(persist_dir="./jsons/contentstack_llm")
service_context = get_index()
index = load_index_from_storage(storage_context, service_context = service_context)

query_engine = index.as_query_engine(streaming = True)
class Item(BaseModel):
    input_text: str

@app.post("/question_answering")
async def create_item(item: Item):
    input_sentence = item.input_text
    response = query_engine.query(input_sentence)
    links = []
    return StreamingResponse(query_engine.query(input_sentence).response_gen)
        

在执行下面的代码TypeError: cannot pickle 'generator' object时,我收到以下错误.在Fastapi中有什么解决办法吗?我可以在我的控制台中流传输答案,但我想在我的API和输出之间创建一个流.此外,如果不是FastAPI,我们可以在Flask中做类似的事情吗?

推荐答案

为了快速修复,我使用了python的Year函数进行了快速修改,并使用StreamingResponse的FastAPI对其进行了标记,并按如下方式更改了代码

# from gpt_index import SimpleDirectoryReader, GPTListIndex,readers, GPTSimpleVectorIndex, LLMPredictor, PromptHelper
from langchain import OpenAI
import asyncio 
from types import FunctionType
from llama_index import ServiceContext, GPTVectorStoreIndex, LLMPredictor, PromptHelper, SimpleDirectoryReader, load_index_from_storage
import sys
import os
import time 
# from llama_index.response.schema import StreamingResponse
from fastapi.responses import StreamingResponse
import uvicorn 
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn 



os.environ["OPENAI_API_KEY"] = "your key here" # gpt 3.5 turbo


app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

from llama_index import StorageContext, load_index_from_storage, ServiceContext
from langchain.chat_models import ChatOpenAI


def construct_index(directory_path):
    max_input_size = 4096
    num_outputs = 5000
    max_chunk_overlap = 256
    chunk_size_limit = 3900

    print("*"*5, "Documents parsing initiated", "*"*5)
    file_metadata = lambda x : {"filename": x}
    reader = SimpleDirectoryReader(directory_path, file_metadata=file_metadata)
    print(reader)
    documents = reader.load_data()
    print("*"*5, "Documents parsing done", "*"*5)
    
    print(documents[0].extra_info)
    print(documents[0].doc_id)
    
    print()
    # nodes = parser.get_nodes_from_documents(documents)
    # index = GPTVectorStoreIndex(nodes)
    prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)
    llm_predictor = LLMPredictor(llm=OpenAI(temperature=0, model_name="gpt-3.5-turbo", max_tokens=num_outputs))
    
    service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper)

    # print("*"*5, "Index creation initiated", "*"*5)
    index = GPTVectorStoreIndex.from_documents(
        documents=documents, service_context = service_context
    )
    # print("*"*5, "Index created", "*"*5)
    index.storage_context.persist("./jsons/contentstack_llm")
    return index
    


def get_index():
    max_input_size = 4000
    num_outputs = 1024
    max_chunk_overlap = 512
    chunk_size_limit = 3900
    prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)
    llm_predictor = LLMPredictor(llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo", max_tokens=num_outputs, streaming = True))
    
    service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper)
    
    return service_context  

# construct_index("./documents")
storage_context = StorageContext.from_defaults(persist_dir="./your_directory")
service_context = get_index()
index = load_index_from_storage(storage_context, service_context = service_context)

query_engine = index.as_query_engine(streaming = True)


async def astreamer(generator):
    try:
        for i in generator:
            yield (i)
            await asyncio.sleep(.1)
    except asyncio.CancelledError as e:
        
        print('cancelled')

class Item(BaseModel):
    input_text: str

@app.post("/question_answering")
async def create_item(item: Item):
    input_sentence = item.input_text
    response = query_engine.query(input_sentence)    
    return StreamingResponse(astreamer(response.response_gen), media_type="text/event-stream")
        

@app.get("/")
@app.get("/health_check")
async def health_check():
    return "ok"
    
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Python相关问答推荐

如何随着收件箱的增加动态添加到HTML表的右下角?

是否有使用纯霍夫曼编码的现代图像格式?

将列中的滚动值集转换为单元格中的单个值

使用itertools出现第n个子串

为什么我的代码会进入无限循环?

根据多列和一些条件创建新列

如何在Power Query中按名称和时间总和进行分组

如果AST请求默认受csref保护,那么在Django中使用@ system_decorator(csref_protect)的目的是什么?

在编写要Excel的数据透视框架时修复标题行

使用GEKKO在简单DTE系统中进行一致初始化

如何根据另一列值用字典中的值替换列值

比较两个数据帧并并排附加结果(获取性能警告)

如何在箱形图中添加绘制线的传奇?

如何将双框框列中的成对变成两个新列

Pandas 有条件轮班操作

所有列的滚动标准差,忽略NaN

使用Python更新字典中的值

Pandas Loc Select 到NaN和值列表

如何从需要点击/切换的网页中提取表格?

为什么\b在这个正则表达式中不解释为反斜杠