File size: 7,468 Bytes
7819b34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Reward calculation for CosyVoice2-0.5B.
"""

from __future__ import annotations

import re
import json
import time
import argparse
from typing import List

import numpy as np
import requests


REWARD_SERVER_URL = "http://localhost:8000/v2/models/token2wav_asr/infer"


def _parse_ids(token_str: str) -> List[int]:
    return [int(t) for t in re.findall(r"<\|s_(\d+)\|>", token_str)]


def _remote_reward(tokens: List[int], ground_truth: str, timeout: float = 200.0) -> float:
    """Send token IDs and ground-truth text to the Triton server and get reward."""

    tokens_arr = np.array(tokens, dtype=np.int32).reshape(1, -1)
    lens_arr = np.array([[tokens_arr.shape[1]]], dtype=np.int32)

    gt_arr = np.array([ground_truth.encode("utf-8")], dtype=object)

    payload = {
        "inputs": [
            {
                "name": "TOKENS",
                "shape": list(tokens_arr.shape),
                "datatype": "INT32",
                "data": tokens_arr.tolist(),
            },
            {
                "name": "TOKEN_LENS",
                "shape": list(lens_arr.shape),
                "datatype": "INT32",
                "data": lens_arr.tolist(),
            },
            {
                "name": "GT_TEXT",
                "shape": [1, 1],
                "datatype": "BYTES",
                "data": [ground_truth],
            },
        ]
    }
    rsp = requests.post(
        REWARD_SERVER_URL,
        headers={"Content-Type": "application/json"},
        json=payload,
        timeout=timeout,
        verify=False,
        params={"request_id": "0"},
    )
    rsp.raise_for_status()
    result = rsp.json()

    try:
        # Reward is returned as the first output
        return float(result["outputs"][0]["data"][0])
    except (KeyError, IndexError, TypeError):
        return 0.0


def compute_score(
    data_source: str,
    solution_str: str,
    ground_truth: str,
    extra_info: dict | None = None,
    *,
    debug_dump: bool = False,
) -> float:
    """Return reward in [0, 1] using the Triton ASR service.

    The reward is based on the pinyin-level WER between the ASR transcript
    produced from *solution_str* and the provided *ground_truth* text.
    """

    # Decode token IDs
    ids = _parse_ids(solution_str)

    # Query remote server for reward
    try:
        reward = _remote_reward(ids, ground_truth)
    except Exception as e:
        reward = 0.0

    if debug_dump:
        print(
            f"\033[92m[{data_source}] Remote reward: {reward:.4f}\033[0m"
        )

    return reward


# CLI quick test
if __name__ == "__main__":
    import sys

    def get_args():
        """Parse command line arguments."""
        parser = argparse.ArgumentParser(
            description="Test TTS CER scoring with data from JSONL file",
            formatter_class=argparse.ArgumentDefaultsHelpFormatter
        )

        parser.add_argument(
            "--input", "-i",
            type=str,
            default="data/emilia_zh-cosy-tiny-test.jsonl",
            help="Path to input JSONL file"
        )

        parser.add_argument(
            "--max-samples", "-n",
            type=int,
            default=None,
            help="Maximum number of samples to process (default: all)"
        )

        parser.add_argument(
            "--no-interactive",
            action="store_true",
            help="Run in non-interactive mode (process all samples without prompts)"
        )

        parser.add_argument(
            "--debug",
            action="store_true",
            help="Enable debug mode"
        )

        return parser.parse_args()

    def load_jsonl(file_path: str):
        """Load data from jsonl file."""
        data = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                data.append(json.loads(line.strip()))
        return data

    def code_to_solution_str(code_list: List[int]) -> str:
        """Convert code list to solution string format."""
        return ''.join([f"<|s_{code}|>" for code in code_list])

    # Parse command line arguments
    args = get_args()

    try:
        # Load data from jsonl file
        print(f"Loading data from: {args.input}")
        data_list = load_jsonl(args.input)
        print(f"Loaded {len(data_list)} samples")

        # Limit samples if specified
        if args.max_samples is not None:
            data_list = data_list[:args.max_samples]
            print(f"Processing first {len(data_list)} samples (limited by --max-samples)")

        # Process each sample
        begin_time = time.time()
        for i, sample in enumerate(data_list):
            print(f"\n--- Sample {i+1}/{len(data_list)} ---")
            print(f"Index: {sample.get('index', 'unknown')}")
            print(f"Text: {sample['text']}")

            # Extract required fields
            code_list = sample['code']
            ground_truth = sample['text']
            data_source = sample.get('index', f'sample_{i}')  # Use index as data_source

            # Convert code list to solution string
            solution_str = code_to_solution_str(code_list)
            print(f"Solution tokens: {len(code_list)} tokens")
            if args.debug:
                print(f"Solution string: {solution_str}")
            else:
                print(f"Solution string preview: {solution_str[:100]}..." if len(solution_str) > 100 else f"Solution string: {solution_str}")

            # Call compute_score function
            try:
                score = compute_score(
                    data_source=data_source,
                    solution_str=solution_str,
                    ground_truth=ground_truth,
                    extra_info=None,
                    debug_dump=args.debug
                )
                print(f"Final Score: {score:.4f}")
            except Exception as e:
                print(f"Error computing score: {e}")

            # Ask user if they want to continue (for interactive mode)
            if not args.no_interactive and i < len(data_list) - 1:
                try:
                    response = input("\nPress Enter to continue or 'q' to quit: ").strip().lower()
                    if response == 'q':
                        break
                except KeyboardInterrupt:
                    print("\nStopped by user")
                    break

        print(f"\nProcessed {min(i+1, len(data_list))} samples")
        end_time = time.time()
        print(f"Time taken: {end_time - begin_time} seconds")
    except FileNotFoundError:
        print(f"Error: File not found - {args.input}")
        print("Please check the file path or use --input to specify correct path")
        print("Run with --help for usage information")
    except Exception as e:
        print(f"Error: {e}")