Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SamplerV2 and EstimatorV2 to be compatible with those of Qiskit 1.1 #2150

Merged
merged 8 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions qiskit_aer/primitives/estimator_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import numpy as np
from qiskit.primitives.base import BaseEstimatorV2
from qiskit.primitives.containers import EstimatorPubLike, PrimitiveResult, PubResult
from qiskit.primitives.containers import DataBin, EstimatorPubLike, PrimitiveResult, PubResult
from qiskit.primitives.containers.estimator_pub import EstimatorPub
from qiskit.primitives.primitive_job import PrimitiveJob
from qiskit.quantum_info import Pauli
Expand Down Expand Up @@ -132,7 +132,6 @@ def _run_pub(self, pub: EstimatorPub) -> PubResult:
).result()

# calculate expectation values (evs) and standard errors (stds)
rng = np.random.default_rng(self.options.run_options.get("seed_simulator"))
flat_indices = list(param_indices.ravel())
evs = np.zeros_like(bc_param_ind, dtype=float)
stds = np.full(bc_param_ind.shape, precision)
Expand All @@ -143,10 +142,11 @@ def _run_pub(self, pub: EstimatorPub) -> PubResult:
expval = result.data(flat_index)[pauli]
evs[index] += expval * coeff
if precision > 0:
evs = rng.normal(evs, precision)
data_bin_cls = self._make_data_bin(pub)
data_bin = data_bin_cls(evs=evs, stds=stds)
rng = np.random.default_rng(self.options.run_options.get("seed_simulator"))
if not np.all(np.isreal(evs)):
raise ValueError("Given operator is not Hermitian and noise cannot be added.")
evs = rng.normal(evs, precision, evs.shape)
return PubResult(
data_bin,
DataBin(evs=evs, stds=stds, shape=evs.shape),
metadata={"target_precision": precision, "simulator_metadata": result.metadata},
)
293 changes: 123 additions & 170 deletions qiskit_aer/primitives/sampler_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,33 @@

from __future__ import annotations

from typing import Iterable
from dataclasses import dataclass, field
import warnings
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Iterable

import numpy as np
from numpy.typing import NDArray

from qiskit import ClassicalRegister, QiskitError, QuantumCircuit
from qiskit.circuit import ControlFlowOp

from qiskit.primitives.backend_sampler_v2 import (
_analyze_circuit,
_MeasureInfo,
_memory_array,
_samples_to_packed_array,
)
from qiskit.primitives.base import BaseSamplerV2
from qiskit.primitives.base.validation import _has_measure
from qiskit.primitives.containers import (
BitArray,
DataBin,
PrimitiveResult,
PubResult,
SamplerPubLike,
make_data_bin,
SamplerPubResult,
)
from qiskit.primitives.containers.sampler_pub import SamplerPub
from qiskit.primitives.containers.bit_array import _min_num_bytes
from qiskit.primitives.primitive_job import PrimitiveJob
from qiskit.result import Result

from qiskit_aer import AerSimulator


@dataclass
class _MeasureInfo:
creg_name: str
num_bits: int
num_bytes: int
qreg_indices: list[int]


@dataclass
class Options:
"""Options for :class:`~.SamplerV2`."""
Expand Down Expand Up @@ -81,13 +74,13 @@ def __init__(
self,
*,
default_shots: int = 1024,
seed: np.random.Generator | int | None = None,
seed: int | None = None,
options: dict | None = None,
):
"""
Args:
default_shots: The default shots for the sampler if not specified during run.
seed: The seed or Generator object for random number generation.
seed: The seed for random number generation.
If None, a random seeded default RNG will be used.
options:
the backend options (``backend_options``), and
Expand All @@ -113,8 +106,8 @@ def default_shots(self) -> int:
return self._default_shots

@property
def seed(self) -> np.random.Generator | int | None:
"""Return the seed or Generator object for random number generation."""
def seed(self) -> int | None:
"""Return the seed for random number generation."""
return self._seed

@property
Expand All @@ -124,170 +117,130 @@ def options(self) -> Options:

def run(
self, pubs: Iterable[SamplerPubLike], *, shots: int | None = None
) -> PrimitiveJob[PrimitiveResult[PubResult]]:
) -> PrimitiveJob[PrimitiveResult[SamplerPubResult]]:
if shots is None:
shots = self._default_shots
coerced_pubs = [SamplerPub.coerce(pub, shots) for pub in pubs]
if any(len(pub.circuit.cregs) == 0 for pub in coerced_pubs):
warnings.warn(
"One of your circuits has no output classical registers and so the result "
"will be empty. Did you mean to add measurement instructions?",
UserWarning,
)

self._validate_pubs(coerced_pubs)
job = PrimitiveJob(self._run, coerced_pubs)
job._submit()
return job

def _run(self, pubs: Iterable[SamplerPub]) -> PrimitiveResult[PubResult]:
results = [self._run_pub(pub) for pub in pubs]
def _validate_pubs(self, pubs: list[SamplerPub]):
for i, pub in enumerate(pubs):
if len(pub.circuit.cregs) == 0:
warnings.warn(
f"The {i}-th pub's circuit has no output classical registers and so the result "
"will be empty. Did you mean to add measurement instructions?",
UserWarning,
)

def _run(self, pubs: list[SamplerPub]) -> PrimitiveResult[SamplerPubResult]:
pub_dict = defaultdict(list)
# consolidate pubs with the same number of shots
for i, pub in enumerate(pubs):
pub_dict[pub.shots].append(i)

results = [None] * len(pubs)
for shots, lst in pub_dict.items():
# run pubs with the same number of shots at once
pub_results = self._run_pubs([pubs[i] for i in lst], shots)
# reconstruct the result of pubs
for i, pub_result in zip(lst, pub_results):
results[i] = pub_result
return PrimitiveResult(results)

def _run_pub(self, pub: SamplerPub) -> PubResult:
circuit, qargs, meas_info = _preprocess_circuit(pub.circuit)
def _run_pubs(self, pubs: list[SamplerPub], shots: int) -> list[SamplerPubResult]:
"""Compute results for pubs that all require the same value of ``shots``."""
circuits = [pub.circuit for pub in pubs]
parameter_binds = [_convert_parameter_bindings(pub) for pub in pubs]

# adjust run_options not to overwrite existings options
run_options = self.options.run_options.copy()
for key in ["shots", "parameter_binds", "memory"]:
if key in run_options:
del run_options[key]
if self._seed is not None and "seed_simulator" in run_options:
del run_options["seed_simulator"]

# run circuits
result = self._backend.run(
circuits,
shots=shots,
seed_simulator=self._seed,
parameter_binds=parameter_binds,
memory=True,
**run_options,
).result()

result_memory = _prepare_memory(result)

# pack memory to an ndarray of uint8
results = []
start = 0
for pub in pubs:
meas_info, max_num_bytes = _analyze_circuit(pub.circuit)
p_v = pub.parameter_values
end = start + p_v.size
results.append(
self._postprocess_pub(
result_memory[start:end],
shots,
p_v.shape,
meas_info,
max_num_bytes,
result.metadata,
)
)
start = end

# convert to parameter bindings
parameter_values = pub.parameter_values
parameter_binds = {}
param_array = parameter_values.as_array(circuit.parameters)
parameter_binds = {p: param_array[..., i].ravel() for i, p in enumerate(circuit.parameters)}
return results

def _postprocess_pub(
self,
result_memory: list[list[str]],
shots: int,
shape: tuple[int, ...],
meas_info: list[_MeasureInfo],
max_num_bytes: int,
metadata: dict,
) -> SamplerPubResult:
"""Converts the memory data into an array of bit arrays with the shape of the pub."""
arrays = {
item.creg_name: np.zeros(
parameter_values.shape + (pub.shots, item.num_bytes), dtype=np.uint8
)
item.creg_name: np.zeros(shape + (shots, item.num_bytes), dtype=np.uint8)
for item in meas_info
}
memory_array = _memory_array(result_memory, max_num_bytes)

metadata = {"shots": pub.shots}
if qargs:
circuit.measure_all()
result = self._backend.run(
circuit,
shots=pub.shots,
seed_simulator=self._seed,
parameter_binds=[parameter_binds],
).result()
all_counts = result.get_counts()

for index, counts in np.ndenumerate(all_counts):
samples = []
for k, v in counts.items():
k = k.replace(" ", "")
kk = ""
for q in qargs:
kk = k[circuit.num_qubits - 1 - q] + kk

samples.append((np.fromiter(kk, dtype=np.uint8), v))

samples_array = np.array([sample for sample, v in samples for _ in range(0, v)])

for item in meas_info:
ary = _samples_to_packed_array(samples_array, item.num_bits, item.qreg_indices)
arrays[item.creg_name][index] = ary
metadata["simulator_metadata"] = result.metadata
else:
for index in np.ndenumerate(parameter_values.shape):
samples = [""] * pub.shots
samples_array = np.array(
[np.fromiter(sample, dtype=np.uint8) for sample in samples]
)
for item in meas_info:
ary = _samples_to_packed_array(samples_array, item.num_bits, item.qreg_indices)
arrays[item.creg_name][index] = ary
for samples, index in zip(memory_array, np.ndindex(*shape)):
for item in meas_info:
ary = _samples_to_packed_array(samples, item.num_bits, item.start)
arrays[item.creg_name][index] = ary

data_bin_cls = make_data_bin(
[(item.creg_name, BitArray) for item in meas_info],
shape=parameter_values.shape,
)
meas = {
item.creg_name: BitArray(arrays[item.creg_name], item.num_bits) for item in meas_info
}
data_bin = data_bin_cls(**meas)
return PubResult(data_bin, metadata=metadata)


def _preprocess_circuit(circuit: QuantumCircuit):
num_bits_dict = {creg.name: creg.size for creg in circuit.cregs}
mapping = _final_measurement_mapping(circuit)
qargs = sorted(set(mapping.values()))
qargs_index = {v: k for k, v in enumerate(qargs)}
circuit = circuit.remove_final_measurements(inplace=False)
if _has_control_flow(circuit):
raise QiskitError("StatevectorSampler cannot handle ControlFlowOp")
if _has_measure(circuit):
raise QiskitError("StatevectorSampler cannot handle mid-circuit measurements")
# num_qubits is used as sentinel to fill 0 in _samples_to_packed_array
sentinel = len(qargs)
indices = {key: [sentinel] * val for key, val in num_bits_dict.items()}
for key, qreg in mapping.items():
creg, ind = key
indices[creg.name][ind] = qargs_index[qreg]
meas_info = [
_MeasureInfo(
creg_name=name,
num_bits=num_bits,
num_bytes=_min_num_bytes(num_bits),
qreg_indices=indices[name],
return SamplerPubResult(
DataBin(**meas, shape=shape), metadata={"simulator_metadata": metadata}
)
for name, num_bits in num_bits_dict.items()
]
return circuit, qargs, meas_info


def _samples_to_packed_array(
samples: NDArray[np.uint8], num_bits: int, indices: list[int]
) -> NDArray[np.uint8]:
# samples of `Statevector.sample_memory` will be in the order of
# qubit_last, ..., qubit_1, qubit_0.
# reverse the sample order into qubit_0, qubit_1, ..., qubit_last and
# pad 0 in the rightmost to be used for the sentinel introduced by _preprocess_circuit.
ary = np.pad(samples[:, ::-1], ((0, 0), (0, 1)), constant_values=0)
# place samples in the order of clbit_last, ..., clbit_1, clbit_0
ary = ary[:, indices[::-1]]
# pad 0 in the left to align the number to be mod 8
# since np.packbits(bitorder='big') pads 0 to the right.
pad_size = -num_bits % 8
ary = np.pad(ary, ((0, 0), (pad_size, 0)), constant_values=0)
# pack bits in big endian order
ary = np.packbits(ary, axis=-1)
return ary


def _final_measurement_mapping(circuit: QuantumCircuit) -> dict[tuple[ClassicalRegister, int], int]:
"""Return the final measurement mapping for the circuit.

Parameters:
circuit: Input quantum circuit.

Returns:
Mapping of classical bits to qubits for final measurements.
"""
active_qubits = set(range(circuit.num_qubits))
active_cbits = set(range(circuit.num_clbits))

# Find final measurements starting in back
mapping = {}
for item in circuit[::-1]:
if item.operation.name == "measure":
loc = circuit.find_bit(item.clbits[0])
cbit = loc.index
qbit = circuit.find_bit(item.qubits[0]).index
if cbit in active_cbits and qbit in active_qubits:
for creg in loc.registers:
mapping[creg] = qbit
active_cbits.remove(cbit)
elif item.operation.name not in ["barrier", "delay"]:
for qq in item.qubits:
_temp_qubit = circuit.find_bit(qq).index
if _temp_qubit in active_qubits:
active_qubits.remove(_temp_qubit)

if not active_cbits or not active_qubits:
break

return mapping


def _has_control_flow(circuit: QuantumCircuit) -> bool:
return any(isinstance(instruction.operation, ControlFlowOp) for instruction in circuit)


def _convert_parameter_bindings(pub: SamplerPub) -> dict:
circuit = pub.circuit
parameter_values = pub.parameter_values
parameter_binds = {}
param_array = parameter_values.as_array(circuit.parameters)
parameter_binds = {p: param_array[..., i].ravel() for i, p in enumerate(circuit.parameters)}
return parameter_binds


def _prepare_memory(result: Result) -> list[list[str]]:
"""There is no split of results due to max_experiments for Aer"""
lst = []
for exp in result.results:
if hasattr(exp.data, "memory") and exp.data.memory:
lst.append(exp.data.memory)
else:
# no measure in a circuit
lst.append(["0x0"] * exp.shots)
return lst
Loading
Loading