diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index aea230bceb4..a17346693da 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -39,6 +39,7 @@ top_k_top_p_sampling, ) from fastdeploy.platforms import current_platform +from fastdeploy.utils import data_processor_logger from fastdeploy.worker.output import LogprobsTensors, SamplerOutput if current_platform.is_cuda(): @@ -223,6 +224,8 @@ def __init__(self, fd_config: FDConfig = None): self.early_stopper = early_stopper_cls() self.early_stopper.initialize(fd_config.parallel_config.max_num_seqs, fd_config.early_stop_config) + self.entropy_list = [[] for _ in range(fd_config.parallel_config.max_num_seqs)] + def apply_logits_processor( self, ids: int, @@ -317,6 +320,40 @@ def gather_logprobs( return LogprobsTensors(indices, top_logprobs, token_ranks) + def calculate_logits_entropy(self, logits, share_inputs, temperature): + real_bsz = share_inputs["seq_lens_this_time"].shape[0] + real_seq_lens = paddle.where( + share_inputs["seq_lens_encoder"][:real_bsz].squeeze(1) != 0, + paddle.ones([1], dtype="int32"), + share_inputs["seq_lens_this_time"].squeeze(1), + ) + + def get_entropy(logits): + a0 = logits - paddle.max(logits, axis=-1, keepdim=True) + ea0 = paddle.exp(a0) + z0 = paddle.sum(ea0, axis=-1, keepdim=True) + p0 = ea0 / z0 + return paddle.sum(p0 * (paddle.log(z0) - a0), axis=-1) + + batch_indices = paddle.arange(real_bsz, dtype="int32") + batch_id_per_token = paddle.repeat_interleave(batch_indices, real_seq_lens) + # print(f"[Sampler][entropy] batch_id_per_token: {batch_id_per_token}") + for i in range(logits.shape[0]): + if temperature[batch_id_per_token[i]] > 0 and temperature[batch_id_per_token[i]] != 1.0: + logits[i] = logits[i].scale_(1 / temperature[batch_id_per_token[i]]) + + entropy_tensor = get_entropy(logits) + entropy = entropy_tensor.tolist() + + for i in range(real_bsz): + for _ in range(real_seq_lens[i]): + self.entropy_list[i].append(entropy.pop(0)) + if share_inputs["stop_flags"][i] and len(self.entropy_list[i]) != 0: + data_processor_logger.info( + f"req_id: {share_inputs['req_ids'][i]}, entropy: {sum(self.entropy_list[i])/len(self.entropy_list[i])}" + ) + self.entropy_list[i] = [] + def forward_cuda( self, logits: paddle.Tensor, @@ -374,6 +411,19 @@ def forward_cuda( logprobs_tensors=logprobs_tensors, ) + # print(f"[Sampler] req_ids: {share_inputs['req_ids']}") + # print(f"[Sampler] next_tokens: {next_tokens}") + # print(f"[Sampler] seq_lens_this_time: {share_inputs['seq_lens_this_time']}") + # print(f"[Sampler] seq_lens_encoder: {share_inputs['seq_lens_encoder']}") + # print(f"[Sampler] logits: {logits}") + # print(f"[Sampler] temperature: {sampling_metadata.temperature}") + # print(f"[Sampler] stop_flags: {share_inputs['stop_flags']}") + # print(f"[Sampler] entropy_list: {self.entropy_list}") + + self.calculate_logits_entropy( + paddle.clone(logits), sampling_metadata.share_inputs, sampling_metadata.temperature + ) + return sampler_output @@ -393,6 +443,7 @@ def __init__(self, fd_config: FDConfig): self.speculative_max_candidate_len = fd_config.speculative_config.max_candidate_len self.speculative_benchmark_mode = fd_config.speculative_config.benchmark_mode self.speculative_tokens_num = fd_config.speculative_config.num_speculative_tokens + self.entropy_list = [[] for _ in range(fd_config.parallel_config.max_num_seqs)] def pre_process(self, skip_idx_list: List[int] = []): """pre process before running""" @@ -499,6 +550,58 @@ def gather_logprobs( return LogprobsTensors(indices, top_logprobs, token_ranks) + def calculate_logits_entropy(self, logits, share_inputs, temperature): + # get accepted logits + real_bsz = share_inputs["seq_lens_this_time"].shape[0] + total_accepted_num = paddle.sum(share_inputs["accept_num"]) + real_seq_lens = paddle.where( + share_inputs["seq_lens_encoder"][:real_bsz].squeeze(1) != 0, + paddle.ones([1], dtype="int32"), + share_inputs["seq_lens_this_time"].squeeze(1), + ) + # print(f"[entropy] real_seq_lens: {real_seq_lens}") + seq_start_idx = paddle.concat([paddle.zeros([1], dtype="int32"), paddle.cumsum(real_seq_lens, dtype="int32")]) + # print(f"[entropy] seq_start_idx: {seq_start_idx}") + repeated_starts = paddle.repeat_interleave(seq_start_idx[:-1], share_inputs["accept_num"][:real_bsz]) + # print(f"[entropy] repeated_starts: {repeated_starts}") + offsets = paddle.concat([paddle.arange(share_inputs["accept_num"][i].item()) for i in range(real_bsz)]).astype( + "int32" + ) + # print(f"[entropy] offsets: {offsets}") + accepted_idx = repeated_starts + offsets + # print(f"[entropy] accepted_idx: {accepted_idx}") + + accepted_logits = paddle.empty([total_accepted_num, logits.shape[1]], dtype=logits.dtype) + # print(f"[entropy] accepted_logits shape: {accepted_logits.shape}") + for i in range(total_accepted_num): + accepted_logits[i] = logits[accepted_idx[i]] + + def get_entropy(logits): + a0 = logits - paddle.max(logits, axis=-1, keepdim=True) + ea0 = paddle.exp(a0) + z0 = paddle.sum(ea0, axis=-1, keepdim=True) + p0 = ea0 / z0 + return paddle.sum(p0 * (paddle.log(z0) - a0), axis=-1) + + batch_indices = paddle.arange(share_inputs["accept_num"].shape[0], dtype="int32") + batch_id_per_token = paddle.repeat_interleave(batch_indices, share_inputs["accept_num"]) + # print(f"[SpeculativeSampler][entropy] batch_id_per_token: {batch_id_per_token}") + for i in range(accepted_logits.shape[0]): + if temperature[batch_id_per_token[i]] > 0 and temperature[batch_id_per_token[i]] != 1.0: + accepted_logits[i] = accepted_logits[i].scale_(1 / temperature[batch_id_per_token[i]]) + + entropy_tensor = get_entropy(accepted_logits) + entropy = entropy_tensor.tolist() + + for i in range(real_bsz): + for _ in range(share_inputs["accept_num"][i]): + self.entropy_list[i].append(entropy.pop(0)) + if share_inputs["stop_flags"][i] and len(self.entropy_list[i]) != 0: + data_processor_logger.info( + f"req_id: {share_inputs['req_ids'][i]}, entropy: {sum(self.entropy_list[i])/len(self.entropy_list[i])}" + ) + self.entropy_list[i] = [] + def forward_cuda( self, logits: paddle.Tensor, @@ -509,6 +612,7 @@ def forward_cuda( reject_all_drafts: bool = False, think_end_id: int = -1, line_break_id: int = -1, + is_dummy_run: bool = False, ) -> paddle.Tensor: logits = apply_speculative_penalty_multi_scores( sampling_metadata.pre_token_ids, @@ -631,6 +735,19 @@ def forward_cuda( cu_batch_token_offset=share_inputs["cu_batch_token_offset"], ) + if not is_dummy_run: + # print(f"[SpeculativeSampler] req_ids: {share_inputs['req_ids']}") + # print(f"[SpeculativeSampler] accept_tokens: {share_inputs['accept_tokens']}") + # print(f"[SpeculativeSampler] accept_num: {share_inputs['accept_num']}") + # print(f"[SpeculativeSampler] seq_lens_this_time: {share_inputs['seq_lens_this_time']}") + # print(f"[SpeculativeSampler] seq_lens_encoder: {share_inputs['seq_lens_encoder']}") + # print(f"[SpeculativeSampler] logits: {logits}") + # print(f"[SpeculativeSampler] temperature: {sampling_metadata.temperature}") + # print(f"[SpeculativeSampler] stop_flags: {share_inputs['stop_flags']}") + # print(f"[SpeculativeSampler] entropy_list: {self.entropy_list}") + + self.calculate_logits_entropy(logits, share_inputs, sampling_metadata.temperature) + return sampler_output