Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion DataSource/Aws/setAwsConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ export const setAWSConnection = async (formData: FormData) => {
} : undefined,
metadata: config.data.metadata,
isConfigSet: true,
isSyncing: true,
limitPages:config.data.pageLimit,
limitFiles: config.data.fileLimit,
}).where(eq(connections.id, config.data.connectionId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const UpdateConfigDirect = ({ connection, status }: { connection: Connect

return (<Dialog open={open} onOpenChange={setOpen} >
<DialogTrigger asChild>
<Button data-test="btn-config" size='sm' variant={connection.isConfigSet ? 'ghost' : 'default'} disabled={status === 'PROCESSING' || (!status && connection.isSyncing)}>
<Button data-test="btn-config" size='sm' variant={connection.isConfigSet ? 'ghost' : 'default'} disabled={status === 'PROCESSING' || (!status && !!connection.jobId)}>
<Settings2 />
Configure
</Button>
Expand Down
20 changes: 13 additions & 7 deletions DataSource/DirectUpload/setDirectUploadConnection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { databaseDrizzle } from "@/db";
import { connections, processedFiles } from "@/db/schemas/connections";
import { qdrant_collection_name, qdrantCLient } from "@/qdrant";
import { tryAndCatch } from "@/lib/try-catch";
import { qdrant_collection_name, qdrantClient } from "@/qdrant";
import { and, eq } from "drizzle-orm";
import { z } from "zod";

Expand Down Expand Up @@ -52,6 +53,7 @@ const directUploadConfig = z.object({
})

const updateDirectUploadConfig = z.object({
userId: z.string().min(5),
connectionId: z.string().min(5),
pageLimit: z.string().nullable().transform((str, ctx): number | null => {
try {
Expand Down Expand Up @@ -91,6 +93,7 @@ const updateDirectUploadConfig = z.object({

export const updateDirectUploadConnection = async (formData: FormData) => {
const config = updateDirectUploadConfig.safeParse({
userId: formData.get("userId"),
connectionId: formData.get("connectionId"),
identifier: formData.get("identifier"),
metadata: formData.get("metadata") || "{}",
Expand All @@ -110,7 +113,7 @@ export const updateDirectUploadConnection = async (formData: FormData) => {
throw new Error(`Validation errors - ${errors}`)
}

const connectionChunksIds: { chunksIds: string[] }[] = [];
const connectionChunksIds: { chunksIds: string[], name: string }[] = [];

await databaseDrizzle
.update(connections)
Expand All @@ -125,14 +128,18 @@ export const updateDirectUploadConnection = async (formData: FormData) => {
.where(and(
eq(processedFiles.connectionId, config.data.connectionId),
eq(processedFiles.name, fileName)
)).returning({ chunksIds: processedFiles.chunksIds })
)).returning({ chunksIds: processedFiles.chunksIds, name: processedFiles.name })
connectionChunksIds.push(...files)
}

for (const { chunksIds } of connectionChunksIds) {
await qdrantCLient.delete(qdrant_collection_name, {
for (const { chunksIds, name } of connectionChunksIds) {
await tryAndCatch(qdrantClient.delete(qdrant_collection_name, {
points: chunksIds,
})
filter: {
must: [{ "key": "_document_id", "match": { value: name } },
{ key: "_userId", match: { value: config.data.userId } }]
}
}))
}

const files = config.data.files.map(async (file) => ({
Expand Down Expand Up @@ -188,7 +195,6 @@ export const setDirectUploadConnection = async (formData: FormData) => {
service: 'DIRECT_UPLOAD',
metadata: metadata,
isConfigSet: true,
isSyncing: true,
limitPages: pageLimit,
limitFiles: fileLimit,
}).returning({ id: connections.id })
Expand Down
1 change: 0 additions & 1 deletion DataSource/Dropbox/setDropboxConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export const setDropboxConnection = async (formData: FormData) => {
} : undefined,
metadata: config.data.metadata,
isConfigSet: true,
isSyncing: true,
limitFiles: config.data.fileLimit,
limitPages: config.data.pageLimit,
}).where(eq(connections.id, config.data.connectionId))
Expand Down
1 change: 0 additions & 1 deletion DataSource/GoogleDrive/setGoogleDriveConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export const setGoogleDriveConnection = async (formData: FormData) => {
limitPages: config.data.pageLimit,
limitFiles: config.data.fileLimit,
isConfigSet: true,
isSyncing: true,
}).where(eq(connections.id, config.data.connectionId))

return {
Expand Down
16 changes: 11 additions & 5 deletions actions/connctions/delete/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import { authOptions } from "@/auth";
import { databaseDrizzle } from "@/db";
import { connections, processedFiles } from "@/db/schemas/connections";
import { tryAndCatch } from "@/lib/try-catch";
import { fromErrorToFormState, toFormState } from "@/lib/zodErrorHandle";
import { qdrant_collection_name, qdrantCLient } from "@/qdrant";
import { qdrant_collection_name, qdrantClient } from "@/qdrant";
import { eq } from "drizzle-orm";
import { getServerSession } from "next-auth";
import { revalidatePath } from "next/cache";
Expand All @@ -26,14 +27,19 @@ export async function deleteConnectionConfig(_: FormState, formData: FormData) {
})

const connectionChunksIds = await databaseDrizzle
.select({ chunksIds: processedFiles.chunksIds })
.select({ chunksIds: processedFiles.chunksIds, name: processedFiles.name })
.from(processedFiles)
.where(eq(processedFiles.connectionId, id))

for (const { chunksIds } of connectionChunksIds) {
await qdrantCLient.delete(qdrant_collection_name, {
for (const { chunksIds, name } of connectionChunksIds) {
await tryAndCatch(qdrantClient.delete(qdrant_collection_name, {
points: chunksIds,
})
filter: {
must: [
{ key: "_document_id", "match": { value: name } },
{ key: "_userId", match: { value: session.user.id } }]
}
}))
}

await databaseDrizzle
Expand Down
1 change: 0 additions & 1 deletion actions/connctions/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ export const syncConnectionConfig = async (_: FormState, formData: FormData) =>
links: []
})
await databaseDrizzle.update(connections).set({
isSyncing: true,
jobId: jobId
}).where(eq(connections.id, connectionId));

Expand Down
4 changes: 2 additions & 2 deletions app/api/chunks/[...ids]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { NextRequest, NextResponse } from "next/server";
import { APIError } from "@/lib/APIError";
import { databaseDrizzle } from "@/db";
import { arrayContains } from "drizzle-orm";
import { qdrant_collection_name, qdrantCLient } from "@/qdrant";
import { qdrant_collection_name, qdrantClient } from "@/qdrant";

type Params = {
params: Promise<{
Expand Down Expand Up @@ -62,7 +62,7 @@ export async function GET(request: NextRequest, { params }: Params) {
)
}

const { data: chunks, error: retrieveError } = await tryAndCatch(qdrantCLient.retrieve(qdrant_collection_name, {
const { data: chunks, error: retrieveError } = await tryAndCatch(qdrantClient.retrieve(qdrant_collection_name, {
ids: chunksIds,
with_payload: true,
}))
Expand Down
21 changes: 13 additions & 8 deletions app/api/connections/[id]/files/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { databaseDrizzle } from "@/db"
import { processedFiles } from "@/db/schemas/connections"
import { checkAuth } from "@/lib/api_key"
import { tryAndCatch } from "@/lib/try-catch"
import { qdrant_collection_name, qdrantCLient } from "@/qdrant"
import { qdrant_collection_name, qdrantClient } from "@/qdrant"
import { and, eq } from "drizzle-orm"
import { NextRequest, NextResponse } from "next/server"
import { APIError } from "openai"
Expand Down Expand Up @@ -80,12 +80,11 @@ export async function DELETE(request: NextRequest, { params }: Params) {
}

const { id } = await params
const connectionChunksIds: { chunksIds: string[] }[] = [];
const connectionChunksIds: { chunksIds: string[], name: string }[] = [];
const connection = await databaseDrizzle.query.connections.findFirst({
where: (c, ops) => ops.and(ops.eq(c.userId, userId), ops.eq(c.id, id)),
columns: {
id: true,
isSyncing: true,
jobId: true,
},
})
Expand All @@ -97,7 +96,7 @@ export async function DELETE(request: NextRequest, { params }: Params) {
}, { status: 403 })
}

if (connection.isSyncing || connection.jobId) {
if (connection.jobId) {
return NextResponse.json(
{
code: 'processing_in_progress',
Expand All @@ -117,7 +116,7 @@ export async function DELETE(request: NextRequest, { params }: Params) {
.where(and(
eq(processedFiles.connectionId, id),
eq(processedFiles.name, fileName)
)).returning({ chunksIds: processedFiles.chunksIds }))
)).returning({ chunksIds: processedFiles.chunksIds, name: processedFiles.name }))

if (error) {
return NextResponse.json({
Expand All @@ -128,10 +127,16 @@ export async function DELETE(request: NextRequest, { params }: Params) {
connectionChunksIds.push(...data)
}

for (const { chunksIds } of connectionChunksIds) {
const { error } = await tryAndCatch(qdrantCLient.delete(qdrant_collection_name, {
for (const { chunksIds, name } of connectionChunksIds) {
const { error } = await tryAndCatch(qdrantClient.delete(qdrant_collection_name, {
points: chunksIds,
wait: isWait
wait: isWait,
filter: {
must: [
{ key: "_document_id", match: { value: name } },
{ key: "_userId", match: { value: userId } }
]
}
}))
if (error) {
return NextResponse.json({
Expand Down
22 changes: 14 additions & 8 deletions app/api/connections/[id]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { APIError } from "@/lib/APIError";
import { databaseDrizzle } from "@/db";
import { eq } from "drizzle-orm";
import { connections } from "@/db/schemas/connections";
import { qdrant_collection_name, qdrantCLient } from "@/qdrant";
import { qdrant_collection_name, qdrantClient } from "@/qdrant";
import { setConnectionToProcess } from "@/fileProcessors/connectors";
import { connectionProcessFiles, directProcessFiles } from "@/fileProcessors";
import { addToProcessFilesQueue } from "@/workers/queues/jobs/processFiles.job";
Expand Down Expand Up @@ -109,12 +109,12 @@ export async function DELETE(request: NextRequest, { params }: Params) {
where: (conn, ops) => ops.and(ops.eq(conn.id, id), ops.eq(conn.userId, userId)),
columns: {
jobId: true,
isSyncing: true,
},
with: {
files: {
columns: {
chunksIds: true
chunksIds: true,
name: true,
}
}
}
Expand All @@ -139,7 +139,7 @@ export async function DELETE(request: NextRequest, { params }: Params) {
)
}

if (conn.isSyncing || conn.jobId) {
if (conn.jobId) {
return NextResponse.json(
{
code: 'processing_in_progress',
Expand All @@ -148,10 +148,16 @@ export async function DELETE(request: NextRequest, { params }: Params) {
{ status: 409 }
)
}
for (const { chunksIds } of conn.files) {
await tryAndCatch(qdrantCLient.delete(qdrant_collection_name, {
for (const { chunksIds, name } of conn.files) {
await tryAndCatch(qdrantClient.delete(qdrant_collection_name, {
points: chunksIds,
wait: wait === "true",
filter: {
must: [
{ key: "_document_id", match: { value: name } },
{ key: "_userId", match: { value: userId } },
]
}
}))
}

Expand Down Expand Up @@ -190,9 +196,9 @@ export async function PUT(request: NextRequest, { params }: Params) {
const { data: conn, error: queryError } = await tryAndCatch(databaseDrizzle.query.connections.findFirst({
where: (conn, ops) => ops.and(ops.eq(conn.id, id), ops.eq(conn.userId, userId)),
columns: {
isSyncing: true,
service: true,
folderName: true,
jobId: true,
}
}))
if (queryError) {
Expand All @@ -215,7 +221,7 @@ export async function PUT(request: NextRequest, { params }: Params) {
)
}

if (conn.isSyncing) {
if (conn.jobId) {
return NextResponse.json(
{
code: 'already_syncing',
Expand Down
4 changes: 2 additions & 2 deletions app/api/connections/[id]/status/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export async function GET(request: NextRequest, { params }: Params) {
where: (conn, ops) => ops.and(ops.eq(conn.userId, userId!), ops.eq(conn.id, id)),
columns: {
isConfigSet: true,
isSyncing: true,
jobId:true,
},
}))
if (queryError) {
Expand All @@ -53,7 +53,7 @@ export async function GET(request: NextRequest, { params }: Params) {

const response = {
status: conn.isConfigSet ? 'ready' : 'config_missing',
isSyncing: conn.isSyncing
isSyncing: !!conn.jobId
}

return NextResponse.json(response, { status: 200 });
Expand Down
6 changes: 3 additions & 3 deletions app/api/connections/[id]/sync/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ export async function POST(request: NextRequest, { params }: Params) {
id: true,
service: true,
isConfigSet: true,
isSyncing: true,
metadata: true
metadata: true,
jobId:true,
},
with: {
files: {
Expand Down Expand Up @@ -75,7 +75,7 @@ export async function POST(request: NextRequest, { params }: Params) {
{ status: 400 },
)
}
if (conn.isSyncing) {
if (conn.jobId) {
return NextResponse.json(
{
code: 'already_syncing',
Expand Down
15 changes: 10 additions & 5 deletions app/api/retrievals/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { apiKeys, users } from '@/db/schemas/users';
import { hashApiKey } from '@/lib/api_key';
import { Plans } from '@/lib/Plans';
import { expandQuery, generateHypotheticalAnswer, vectorizeText } from '@/openAi';
import { qdrant_collection_name, qdrantCLient } from '@/qdrant';
import { qdrant_collection_name, qdrantClient } from '@/qdrant';
import { RetrievalFilter } from '@/validations/retrievalsFilteringSchema'
import { and, eq, sql } from 'drizzle-orm';
import { getServerSession } from 'next-auth';
Expand Down Expand Up @@ -105,17 +105,22 @@ export async function POST(request: NextRequest) {
`Please upgrade to increase your limit.`
}, {
status: 429,
headers: { "Retry-After": "3600" }
headers: { "Retry-After": "3600" }
});
}

const { query, filter, top_chunk, rerank, min_score_threshold } = validation.data
const queries = await expandQuery(query)
const vectors = await vectorizeText(queries)

const queryPoints = await qdrantCLient.search(qdrant_collection_name, {
const queryPoints = await qdrantClient.search(qdrant_collection_name, {
vector: vectors,
filter: filter ? { must: [{ nested: { key: "_metadata", filter: filter } }] } : undefined,
filter: filter ? {
must: [
{ nested: { key: "_metadata", filter: filter } },
{ key: "_userId", match: { value: userId } }
]
} :
{ must: [{ key: "_userId", match: { value: userId } }] },
limit: rerank ? top_chunk * 2 : top_chunk,
with_payload: true,
with_vector: true
Expand Down
1 change: 0 additions & 1 deletion app/api/upload/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ export async function POST(request: NextRequest) {
code: "bad_request",
message: errorForm.message,
}, { status: 400 })

formData.set("userId", userId)
formData.set("service", "DIRECT_UPLOAD")
const {
Expand Down
Loading