import numpy as np
from scipy.sparse import csr_matrix
import cudaq_qec as qec
import json
import time
import requests
import bz2
import os
def parse_csr_mat(j, dims, mat_name):
"""
Parse a CSR-style matrix from a JSON file using SciPy's sparse matrix utilities.
"""
assert len(dims) == 2, "dims must be a tuple of two integers"
indptr = np.array(j[f"{mat_name}_indptr"], dtype=int)
indices = np.array(j[f"{mat_name}_indices"], dtype=int)
assert len(indptr) == dims[0] + 1, "indptr length must equal dims[0] + 1"
assert np.all(
indices < dims[1]), "All column indices must be less than dims[1]"
data = np.ones(indptr[-1], dtype=np.uint8)
csr = csr_matrix((data, indices, indptr), shape=dims, dtype=np.uint8)
return csr.toarray()
def parse_H_csr(j, dims):
"""
Parse a CSR-style parity check matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "H")
def parse_obs_csr(j, dims):
"""
Parse a CSR-style observable matrix from an input file in JSON format"
"""
return parse_csr_mat(j, dims, "obs_mat")
def run_decoder(filename, num_shots, run_as_batched, print_output=False, osd=0):
"""
Load a JSON file and decode "num_shots" syndromes.
"""
t_load_begin = time.time()
with open(filename, "r") as f:
j = json.load(f)
dims = j["shape"]
assert len(dims) == 2
H = parse_H_csr(j, dims)
syndrome_length, block_length = dims
t_load_end = time.time()
error_rate_vec = np.array(j["error_rate_vec"])
assert len(error_rate_vec) == block_length
obs_mat_dims = j["obs_mat_shape"]
obs_mat = parse_obs_csr(j, obs_mat_dims)
assert dims[1] == obs_mat_dims[0]
file_num_trials = j["num_trials"]
num_shots = min(num_shots, file_num_trials)
print(
f'Your JSON file has {file_num_trials} shots. Running {num_shots} now.')
osd_method = osd
osd_order = 0
max_iter = 50
nv_dec_args = {
"max_iterations": max_iter,
"error_rate_vec": error_rate_vec,
"use_sparsity": True,
"use_osd": osd_method > 0,
"osd_order": osd_order,
"osd_method": osd_method
}
if run_as_batched:
nv_dec_args['bp_batch_size'] = min(1000, num_shots)
try:
nv_dec_gpu_and_cpu = qec.get_decoder("nv-qldpc-decoder", H,
**nv_dec_args)
except Exception as e:
print(
'The nv-qldpc-decoder is not available with your current CUDA-Q ' +
'QEC installation.')
exit(0)
decoding_time = 0
bp_converged_flags = []
num_logical_errors = 0
if run_as_batched:
syndrome_list = []
obs_truth_list = []
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
syndrome_list.append(syndrome)
obs_truth_list.append(obs_truth)
t0 = time.time()
results = nv_dec_gpu_and_cpu.decode_batch(syndrome_list)
t1 = time.time()
decoding_time += t1 - t0
for r, obs_truth in zip(results, obs_truth_list):
bp_converged_flags.append(r.converged)
dec_result = np.array(r.result, dtype=np.uint8)
predicted_observable = obs_mat.T @ dec_result % 2
if print_output == True:
print(f"predicted_observable: {predicted_observable}")
actual_observable = np.array(obs_truth, dtype=np.uint8)
if print_output == True:
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
else:
for i in range(num_shots):
syndrome = j["trials"][i]["syndrome_truth"]
obs_truth = j["trials"][i]["obs_truth"]
t0 = time.time()
bp_converged, dec_result, *_ = nv_dec_gpu_and_cpu.decode(syndrome)
t1 = time.time()
trial_diff = t1 - t0
decoding_time += trial_diff
dec_result = np.array(dec_result, dtype=np.uint8)
bp_converged_flags.append(bp_converged)
predicted_observable = obs_mat.T @ dec_result % 2
if print_output == True:
print(f"predicted_observable: {predicted_observable}")
actual_observable = np.array(obs_truth, dtype=np.uint8)
if print_output == True:
print(f"actual_observable: {actual_observable}")
if np.sum(predicted_observable != actual_observable) > 0:
num_logical_errors += 1
print(f"{num_logical_errors} logical errors in {num_shots} shots")
print(
f"Number of shots that converged with BP processing: {np.sum(np.array(bp_converged_flags))}"
)
print(
f"Average decoding time for {num_shots} shots was {1e3 * decoding_time / num_shots} ms per shot"
)