DifyThread.java 4.94 KB
Newer Older
何处是我家's avatar
提交  
何处是我家 committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package com.ewaytek.deepseek.task;

import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.BlockingVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.doadmin.vo.dify.MetadataVO;
import com.ewaytek.deepseek.doadmin.vo.dify.RetrieverResources;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.collections4.CollectionUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;

/**
 * @author yangtq
 * @date 2025/2/28
 */
@Slf4j
public class DifyThread implements Runnable{


    private DIfyImportVerifyDTO data;
    private DifyConfig difyConfig;
    private static final RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒最多 5 次调用

    private OkHttpClient httpClient;

    // 静态共享集合,用于存储所有任务的结果
    private static final List<DIfyImportVerifyDTO> results = Collections.synchronizedList(new ArrayList<>());

    private final Consumer<DIfyImportVerifyDTO> callback;




    public DifyThread(DIfyImportVerifyDTO data, DifyConfig difyConfig, OkHttpClient httpClient, Consumer<DIfyImportVerifyDTO> callback){
        this.data = data;
        this.difyConfig = difyConfig;
        this.httpClient=httpClient;
        this.callback = callback;
    }

    @Override
    public void run() {
        try {
            rateLimiter.acquire(); // 获取令牌,阻塞等待直到获取成功
            // 调用大模型处理数据
            DIfyImportVerifyDTO result = processBatch(data);
            if(callback!=null){
                callback.accept(result); // 调用回调
            }
            // 处理结果
            System.out.println("Processed result: " + result.getAnswer());
        } catch (Exception e) {
            System.err.println("Error processing data: " + e.getMessage());
        }
    }

    private DIfyImportVerifyDTO processBatch(DIfyImportVerifyDTO dto) throws Exception {
        DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
        difyChatDTO.setQuery(dto.getQuestion());
        difyChatDTO.setUser("ewaytek"+System.currentTimeMillis());
        difyChatDTO.setResponseMode("streaming");
        //构建请求参数json数据
        ObjectMapper mapper = new ObjectMapper();
        String requestBody = mapper.writeValueAsString(difyChatDTO);
        Headers headers   = new Headers.Builder().add("Authorization", "Bearer " + difyConfig.getApiKey()).add("Content-Type", "application/json").build();

        Request request = new Request.Builder().url(difyConfig.getApiHost() + "chat-messages").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
                .headers(headers).build();
        List<RetrieverResources> retrieverResourcesList=new ArrayList<>();

        try (Response response = httpClient.newCall(request).execute()) {
            if (!response.isSuccessful()) {

                return dto;
            }
            // 处理流式响应
            ResponseBody responseBody = response.body();
            if (responseBody != null) {
                StringBuilder responseBuilder = new StringBuilder();

                while (!responseBody.source().exhausted()) {
                    String line = responseBody.source().readUtf8Line();
                    if (line != null && !line.isEmpty()) {
                        if (line.startsWith("data:")) { // 处理 SSE 格式
                            String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
                            DifyStreamVO blockingVO =   JSON.parseObject(eventData, DifyStreamVO.class);
                            MetadataVO metadataVO= blockingVO.getMetadata();
                            if(metadataVO!=null){
                                if (!CollectionUtils.isEmpty(metadataVO.getRetrieverResources())) {
                                    retrieverResourcesList.addAll(metadataVO.getRetrieverResources());
                                }
                            }
                            responseBuilder.append(blockingVO.getAnswer());
                        }
                    }
                }
                dto.setRetrieverResources(retrieverResourcesList);
                dto.setAnswer(responseBuilder.toString());
            }

        } catch (IOException e) {
            log.error(e.getMessage());
        }
        return dto;
    }


}