Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/reference manager #191

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cursorrules
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Always output the code comment in English.
6 changes: 5 additions & 1 deletion apps/api/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,14 @@ model Reference {
sourceType String @map("source_type")
/// Source entity ID
sourceId String @map("source_id")
/// Target entity type (e.g., "canvas" or "resource")
/// Source entity metadata (JSON), should only be used for external url references
sourceMeta String? @map("source_meta")
/// Target entity type (e.g., "canvas", "resource", "externalUrl")
targetType String @map("target_type")
/// Target entity ID
targetId String @map("target_id")
/// Target entity metadata (JSON), should only be used for external url references
targetMeta String? @map("target_meta")
/// UID of the user who created the reference
uid String @map("uid")
/// Create timestamp
Expand Down
12 changes: 9 additions & 3 deletions apps/api/src/knowledge/knowledge.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export type FinalizeResourceParam = {
uid: string;
};

export type ParseReferenceExternalUrlParam = {
referenceIds: string[];
};

export const projectPO2DTO = (project: ProjectModel): Project => {
return {
...pick(project, ['projectId', 'title', 'description', 'shareCode']),
Expand Down Expand Up @@ -76,13 +80,15 @@ export const canvasPO2DTO = (
};

export interface ExtendedReferenceModel extends ReferenceModel {
sourceMeta?: ReferenceMeta;
targetMeta?: ReferenceMeta;
sourceMetaObj?: ReferenceMeta;
targetMetaObj?: ReferenceMeta;
}

export const referencePO2DTO = (reference: ExtendedReferenceModel): Reference => {
return {
...pick(reference, ['referenceId', 'sourceId', 'targetId', 'sourceMeta', 'targetMeta']),
...pick(reference, ['referenceId', 'sourceId', 'targetId']),
sourceMeta: reference.sourceMetaObj ?? JSON.parse(reference.sourceMeta || '{}'),
targetMeta: reference.targetMetaObj ?? JSON.parse(reference.targetMeta || '{}'),
sourceType: reference.sourceType as ReferenceType,
targetType: reference.targetType as ReferenceType,
};
Expand Down
12 changes: 9 additions & 3 deletions apps/api/src/knowledge/knowledge.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import { CommonModule } from '@/common/common.module';
import { RAGModule } from '@/rag/rag.module';
import { MiscModule } from '@/misc/misc.module';
import { SubscriptionModule } from '@/subscription/subscription.module';
import { ResourceProcessor } from './knowledge.processor';
import { ReferenceProcessor, ResourceProcessor } from './knowledge.processor';
import { CanvasWsGateway } from './knowledge.gateway';
import { QUEUE_RESOURCE, QUEUE_SIMPLE_EVENT, QUEUE_SYNC_STORAGE_USAGE } from '@/utils';
import {
QUEUE_RESOURCE,
QUEUE_PARSE_REF_URL,
QUEUE_SIMPLE_EVENT,
QUEUE_SYNC_STORAGE_USAGE,
} from '@/utils';

@Module({
imports: [
Expand All @@ -20,11 +25,12 @@ import { QUEUE_RESOURCE, QUEUE_SIMPLE_EVENT, QUEUE_SYNC_STORAGE_USAGE } from '@/
MiscModule,
SubscriptionModule,
BullModule.registerQueue({ name: QUEUE_RESOURCE }),
BullModule.registerQueue({ name: QUEUE_PARSE_REF_URL }),
BullModule.registerQueue({ name: QUEUE_SIMPLE_EVENT }),
BullModule.registerQueue({ name: QUEUE_SYNC_STORAGE_USAGE }),
],
controllers: [KnowledgeController],
providers: [KnowledgeService, ResourceProcessor, CanvasWsGateway],
providers: [KnowledgeService, ResourceProcessor, ReferenceProcessor, CanvasWsGateway],
exports: [KnowledgeService],
})
export class KnowledgeModule {}
23 changes: 21 additions & 2 deletions apps/api/src/knowledge/knowledge.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { Logger } from '@nestjs/common';
import { Job } from 'bull';

import { KnowledgeService } from './knowledge.service';
import { QUEUE_RESOURCE, CHANNEL_FINALIZE_RESOURCE } from '../utils/const';
import { FinalizeResourceParam } from './knowledge.dto';
import { QUEUE_RESOURCE, CHANNEL_FINALIZE_RESOURCE, QUEUE_PARSE_REF_URL } from '../utils/const';
import { FinalizeResourceParam, ParseReferenceExternalUrlParam } from './knowledge.dto';

@Processor(QUEUE_RESOURCE)
export class ResourceProcessor {
Expand All @@ -24,3 +24,22 @@ export class ResourceProcessor {
}
}
}

@Processor(QUEUE_PARSE_REF_URL)
export class ReferenceProcessor {
private readonly logger = new Logger(ReferenceProcessor.name);

constructor(private knowledgeService: KnowledgeService) {}

@Process()
async handleParseReferenceExternalUrl(job: Job<ParseReferenceExternalUrlParam>) {
this.logger.log(`[handleParseReferenceExternalUrl] job: ${JSON.stringify(job)}`);

try {
await this.knowledgeService.parseReferenceExternalUrl(job.data);
} catch (error) {
this.logger.error(`[handleParseReferenceExternalUrl] error: ${error?.stack}`);
throw error;
}
}
}
94 changes: 74 additions & 20 deletions apps/api/src/knowledge/knowledge.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
UpsertProjectRequest,
BindProjectResourceRequest,
QueryReferencesRequest,
ReferenceType,
BaseReference,
AddReferencesRequest,
DeleteReferencesRequest,
Expand All @@ -38,6 +37,7 @@ import {
QUEUE_RESOURCE,
streamToString,
QUEUE_SYNC_STORAGE_USAGE,
QUEUE_PARSE_REF_URL,
} from '@/utils';
import {
genResourceID,
Expand All @@ -47,7 +47,11 @@ import {
genProjectID,
genReferenceID,
} from '@refly-packages/utils';
import { ExtendedReferenceModel, FinalizeResourceParam } from './knowledge.dto';
import {
ExtendedReferenceModel,
FinalizeResourceParam,
ParseReferenceExternalUrlParam,
} from './knowledge.dto';
import { pick } from '../utils';
import { SimpleEventData } from '@/event/event.dto';
import { SyncStorageUsageJobData } from '@/subscription/subscription.dto';
Expand Down Expand Up @@ -75,6 +79,7 @@ export class KnowledgeService {
private subscriptionService: SubscriptionService,
@Inject(MINIO_INTERNAL) private minio: MinioService,
@InjectQueue(QUEUE_RESOURCE) private queue: Queue<FinalizeResourceParam>,
@InjectQueue(QUEUE_PARSE_REF_URL) private refQueue: Queue<ParseReferenceExternalUrlParam>,
@InjectQueue(QUEUE_SIMPLE_EVENT) private simpleEventQueue: Queue<SimpleEventData>,
@InjectQueue(QUEUE_SYNC_STORAGE_USAGE) private ssuQueue: Queue<SyncStorageUsageJobData>,
) {}
Expand Down Expand Up @@ -939,7 +944,7 @@ export class KnowledgeService {
user: User,
param: QueryReferencesRequest,
): Promise<ExtendedReferenceModel[]> {
const { sourceType, sourceId, targetType, targetId } = param;
const { sourceType, sourceId, targetType, targetId, referenceId } = param;

// Check if the source and target entities exist for this user
const entityChecks: Promise<void>[] = [];
Expand All @@ -951,7 +956,9 @@ export class KnowledgeService {
}
await Promise.all(entityChecks);

const where: Prisma.ReferenceWhereInput = {};
const where: Prisma.ReferenceWhereInput = {
referenceId,
};
if (sourceType && sourceId) {
where.sourceType = sourceType;
where.sourceId = sourceId;
Expand Down Expand Up @@ -998,17 +1005,24 @@ export class KnowledgeService {
});
}

const genReferenceMeta = (sourceType: string, sourceId: string) => {
const genReferenceMetaObj = (refType: string, refId: string) => {
// Don't generate metadata object for external URLs, use targetMeta directly
if (refType === 'externalUrl') {
return null;
}

let refMeta: ReferenceMeta;
if (sourceType === 'resource') {
if (refType === 'resource') {
refMeta = {
title: resourceMap[sourceId]?.title,
url: JSON.parse(resourceMap[sourceId]?.meta || '{}')?.url,
title: resourceMap[refId]?.title,
url: JSON.parse(resourceMap[refId]?.meta || '{}')?.url,
description: resourceMap[refId]?.contentPreview,
};
} else if (sourceType === 'canvas') {
} else if (refType === 'canvas') {
refMeta = {
title: canvasMap[sourceId]?.title,
projectId: canvasMap[sourceId]?.projectId,
title: canvasMap[refId]?.title,
projectId: canvasMap[refId]?.projectId,
description: canvasMap[refId]?.contentPreview,
};
}
return refMeta;
Expand All @@ -1018,8 +1032,8 @@ export class KnowledgeService {
return references.map((ref) => {
return {
...ref,
sourceMeta: genReferenceMeta(ref.sourceType, ref.sourceId),
targetMeta: genReferenceMeta(ref.targetType, ref.targetId),
sourceMetaObj: genReferenceMetaObj(ref.sourceType, ref.sourceId),
targetMetaObj: genReferenceMetaObj(ref.targetType, ref.targetId),
};
});
}
Expand All @@ -1028,16 +1042,17 @@ export class KnowledgeService {
user: User,
references: BaseReference[],
): Promise<Prisma.ReferenceCreateManyInput[]> {
const validRefTypes: ReferenceType[] = ['resource', 'canvas'];

// Deduplicate references using a Set with stringified unique properties
const uniqueRefs = new Set(
references.map((ref) =>
JSON.stringify({
sourceType: ref.sourceType,
sourceId: ref.sourceId,
targetType: ref.targetType,
targetId: ref.targetId,
targetId:
ref.targetType === 'externalUrl'
? normalizeUrl(ref.targetId, { stripHash: true })
: ref.targetId,
}),
),
);
Expand All @@ -1047,10 +1062,10 @@ export class KnowledgeService {
const canvasIds: Set<string> = new Set();

deduplicatedRefs.forEach((ref) => {
if (!validRefTypes.includes(ref.sourceType)) {
if (!['resource', 'canvas'].includes(ref.sourceType)) {
throw new ParamsError(`Invalid source type: ${ref.sourceType}`);
}
if (!validRefTypes.includes(ref.targetType)) {
if (!['resource', 'canvas', 'externalUrl'].includes(ref.targetType)) {
throw new ParamsError(`Invalid target type: ${ref.targetType}`);
}
if (ref.sourceType === 'resource' && ref.targetType === 'canvas') {
Expand Down Expand Up @@ -1098,7 +1113,8 @@ export class KnowledgeService {
...canvases.map((c) => c.canvasId),
]);
const missingEntities = deduplicatedRefs.filter(
(e) => !foundIds.has(e.sourceId) || !foundIds.has(e.targetId),
(e) =>
!foundIds.has(e.sourceId) || (e.targetType !== 'externalUrl' && !foundIds.has(e.targetId)),
);
if (missingEntities.length > 0) {
this.logger.warn(`Entities not found: ${JSON.stringify(missingEntities)}`);
Expand All @@ -1116,7 +1132,7 @@ export class KnowledgeService {
const { references } = param;
const referenceInputs = await this.prepareReferenceInputs(user, references);

return this.prisma.$transaction(
const refResults = await this.prisma.$transaction(
referenceInputs.map((input) =>
this.prisma.reference.upsert({
where: {
Expand All @@ -1132,6 +1148,19 @@ export class KnowledgeService {
}),
),
);

const externalUrlReferenceIds = refResults
.filter((ref) => ref.targetType === 'externalUrl')
.map((ref) => ref.referenceId);

// Process external URLs in batches of 10
const BATCH_SIZE = 10;
for (let i = 0; i < externalUrlReferenceIds.length; i += BATCH_SIZE) {
const batch = externalUrlReferenceIds.slice(i, i + BATCH_SIZE);
await this.refQueue.add({ referenceIds: batch });
}

return refResults;
}

async deleteReferences(user: User, param: DeleteReferencesRequest) {
Expand All @@ -1158,4 +1187,29 @@ export class KnowledgeService {
},
});
}

async parseReferenceExternalUrl(param: ParseReferenceExternalUrlParam) {
const { referenceIds } = param;

const references = await this.prisma.reference.findMany({
select: { referenceId: true, targetId: true },
where: { referenceId: { in: referenceIds }, deletedAt: null },
});
const urls = references.map((ref) => ref.targetId);

const scrapedUrls = await Promise.all(
urls.map((url) => this.miscService.scrapeWeblink({ url })),
);

await this.prisma.$transaction(
references.map((ref, index) =>
this.prisma.reference.update({
where: { referenceId: ref.referenceId },
data: {
targetMeta: JSON.stringify(scrapedUrls[index]),
},
}),
),
);
}
}
3 changes: 2 additions & 1 deletion apps/api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ async function bootstrap() {

await app.listen(configService.get('port'));
}
bootstrap();

bootstrap().catch((err) => Sentry.captureException(err));
1 change: 1 addition & 0 deletions apps/api/src/misc/misc.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export class MiscService {
const result = await scrapeWeblink(url);

return {
url,
title: result.title,
description: result.description,
image: result.image,
Expand Down
14 changes: 7 additions & 7 deletions apps/api/src/utils/const.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
export const QUEUE_WEBLINK = 'weblink';
export const QUEUE_RESOURCE = 'resource';
export const CHANNEL_FINALIZE_RESOURCE = 'finalizeResource';

export const QUEUE_PARSE_REF_URL = 'parseRefUrl';

export const QUEUE_SKILL = 'skill';
export const CHANNEL_INVOKE_SKILL = 'invokeSkill';

export const QUEUE_SIMPLE_EVENT = 'simpleEvent';

export const QUEUE_SYNC_TOKEN_USAGE = 'syncTokenUsage';
export const QUEUE_SYNC_STORAGE_USAGE = 'syncStorageUsage';

export const CHANNEL_PROCESS_LINK = 'processLink';
export const CHANNEL_PROCESS_LINK_BY_USER = 'processLinkByUser';
export const CHANNEL_FINALIZE_RESOURCE = 'finalizeResource';
export const CHANNEL_INVOKE_SKILL = 'invokeSkill';
export const CHANNEL_REPORT_EVENT = 'reportEvent';
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export const BaseSearchAndSelector = ({
onSearchValueChange(val);
}
setSearchValue(val);
console.log('handleSearch', val, activeTab);
handleSearch(val, activeTab);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { BaseMarkType, frontendBaseMarkTypes, backendBaseMarkTypes, Mark } from
import { useLoadExtensionWeblinkData } from './use-load-weblink-data.extension';

interface UseSearchStrategyProps {
limit?: number;
source: MessageIntentSource;
onLoadingChange?: (loading: boolean) => void;
}
Expand All @@ -35,7 +36,7 @@ const mapSearchResultToMark = (searchResult: SearchResult): Mark => {
return newMark;
};

export const useSearchStrategy = ({ source, onLoadingChange }: UseSearchStrategyProps) => {
export const useSearchStrategy = ({ limit = 5, source, onLoadingChange }: UseSearchStrategyProps) => {
const [displayMode, setDisplayMode] = useState<'search' | 'list'>('list');
const searchStore = useSearchStore();

Expand Down Expand Up @@ -83,6 +84,7 @@ export const useSearchStrategy = ({ source, onLoadingChange }: UseSearchStrategy
body: {
query: searchVal,
domains: domains,
limit,
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@
margin-top: 8px;
display: flex;
align-items: center;
justify-content: flex-end;
justify-content: space-between;

.ai-copilot-answer-token-usage {
margin-left: -4px;
Expand Down
Loading