Module ateams.statistics

Sub-modules

ateams.statistics.accepts
ateams.statistics.autocorrelation
ateams.statistics.cutoffs
ateams.statistics.observables
ateams.statistics.schedules

Functions

def always(model=None, distribution=None, burnIn=None)

An acceptance function which always accepts the proposed state; we do nothing with the arguments to this function.

Args

model : Model
Optional argument; does nothing.
distribution : Callable
Optional argument; does nothing.
burnIn : int
Optional argument; does nothing.

Returns

A function which always returns True.

def constant(temperature)

A constant annealing schedule.

Args

temperature : float
The temperature to be returned.

Returns

A function passed to a model constructor.

def critical(field)

A constant annealing schedule which calculates the critical temperature of the model.

Args

field : int
The order of the field we're over.

Returns

A function passed to a Model constructor that returns the critical temperature of the Potts model.

def integrated(X, isNormalized=False)

Computes (an estimate of) the integrated autocorrelation time \tau_X of the observable X by \tau_X = \frac 12 \sum_{t=-M}^M \overline \rho_X(t) = \frac 12 + \sum_{t=1}^M \overline \rho_X(t), where \overline \rho_X(t) is the normalized autocorrelation for X. Computes \tau_X for all 1 \leq M \leq N-1 for N = |X| the number of observations.

Args

X : np.array
A NumPy array of numerical values.
isNormalized : boolean=False
Are the data passed to X already normalized autocorrelation times? If not, we first compute \overline \rho_X.

Returns

(Estimates of) the integrated autocorrelation times.

def normalized(X)

Computes the vector \overline \rho_X of normalized autocorrelations \overline \rho_X(t) = \rho_X(t)/\rho_X(0) for the observable X.

Args

X : np.array
A NumPy array of numerical values.

Returns

A NumPy array of normalized autocorrelation values.

def occupancy(state, model=None)

Computes the bond occupancy \mathcal N(\sigma, \omega) = \sum_{x \in X} \mathbf 1[ \omega(x) = 1 ] of the state produced by the given model, where \omega is a bond(/plaquette) configuration. (Just counts the number of d-cells included by independent percolation conditioned on a spin configuration \sigma.) Note: at every step of a simulation, each Model returns a binary vector over the d-cells indicating which are in/excluded; the occupancy can be computed (as is done here) by .sum()ming on this vector.

Args

state : tuple
The state from the Markov chain on the given model.
model : Model=None
The Model (e.g. SwendsenWang) from which data is collected. Doesn't do anything here.

Returns

The bond occupancy.

def rectangular(Z, c=7)

Computes the rectangular cutoff for the integrated autocorrelation time estimator Z — this cutoff is (approximately) the number of "good" estimates of the integrated autocorrelation time before random walk-y behavior occurs, as in Ossola and Sokal (2004).

Args

Z : np.array
NumPy array of integrated autocorrelation values.
c : int=7
Appropriate scaling constant.
def totalEnergy(state, model)

Computes the total energy (or, more precisely, the net energy) \mathcal E of a given state produced by the given model. The total energy is given by \mathcal E (\sigma) = \sum_{x \in X} (-1)^{1-\mathbf 1\left[ \sigma(\partial x) = 0 \right]} = \sum_{x \in X} \mathbf 1\left[ \sigma(\partial x) = 0 \right] - \sum_{x \in X} \mathbf 1\left[ \sigma(\partial x) \neq 0 \right], where the x \in X are d-cells and \sigma is a spin configuration on (d-1)-cells.

Args

state : tuple
The state from the Markov chain on the given model.
model : Model
The Model (e.g. SwendsenWang) from which data is collected.

Returns

The total (net) energy.

Classes

class Chain (model,
accept=<function always.<locals>._>,
statistics={},
steps=10000)

Simulates a Markov chain on the given Model. For example, to generate 1000 samples from 2-dimensional Potts lattice gauge theory with coefficients in \mathbb Z/5\mathbb Z and store the spins on each 1-cell and the occupied 2-cells, use something like

from ateams import Chain, Recorder
from ateams.statistics import critical
from ateams.complexes import Cubical
from ateams.models import SwendsenWang

# Construct the cubical complex; instantiate model, chain.
C = Cubical().fromCorners([10]*4)
SW = SwendsenWang(C, dimension=2, field=5, temperature=critical(5))
M = Chain(SW, steps=1000)

# Run the chain, storing the output.
with Recorder().record("out.lz") as rec:
        for (spins, occupied) in M.progress():
                # You can store as many outputs as you'd like --- even ones
                # that aren't spit out by the model --- but they must be passed
                # to the `.store()` method as a tuple.
                rec.store((spins, occupied))

The Recorder class stores the differences between consecutive iterations using the LZ4 compression schema, compressing the diffs in chunks of (default) size 100. You can replay the data with the Player class:

from ateams import Player

with Player().playback("out.lz", steps=1000) as play:
        for (spins, occupied) in play.progress():
                <do whatever>

Running the sampler and recording the data takes ~26 seconds (~38 it/s) on an M2 MacBook Air; replaying the data takes ~0 seconds (~24,211 it/s). The size of out.lz is ~1.8MB, so storing each cell's data requires 1/11th of a byte per iteration (amortized).

Args

model : Model
A Model (e.g. SwendsenWang) generating the Markov chain.
accept : Callable
A function that consumes the complex, model, and state to determine whether we're going to a good place.
statistics : dict
A mapping of names to functions which take the complex, model, and state as argument. The Chain keeps track of these at each iteration and stores whatever output is given.
steps : int
The number of iterations in the chain.
Expand source code
class Chain:
        """
        Simulates a Markov chain on the given Model. For example, to generate 1000 samples
        from 2-dimensional Potts lattice gauge theory with coefficients in \(\mathbb Z/5\mathbb Z\)
        and store the spins on each 1-cell and the occupied 2-cells, use something like

        ```python
        from ateams import Chain, Recorder
        from ateams.statistics import critical
        from ateams.complexes import Cubical
        from ateams.models import SwendsenWang

        # Construct the cubical complex; instantiate model, chain.
        C = Cubical().fromCorners([10]*4)
        SW = SwendsenWang(C, dimension=2, field=5, temperature=critical(5))
        M = Chain(SW, steps=1000)

        # Run the chain, storing the output.
        with Recorder().record("out.lz") as rec:
                for (spins, occupied) in M.progress():
                        # You can store as many outputs as you'd like --- even ones
                        # that aren't spit out by the model --- but they must be passed
                        # to the `.store()` method as a tuple.
                        rec.store((spins, occupied))
        ```

        The `Recorder` class stores the differences between consecutive iterations using
        the [LZ4 compression schema](github.com/lz4/lz4), compressing the diffs in
        chunks of (default) size 100. You can replay the data with the `Player`
        class:
        
        ```python
        from ateams import Player

        with Player().playback("out.lz", steps=1000) as play:
                for (spins, occupied) in play.progress():
                        <do whatever>
        ```

        Running the sampler and recording the data takes ~26 seconds (~38 it/s) on
        an M2 MacBook Air; replaying the data takes ~0 seconds (~24,211 it/s). The
        size of `out.lz` is ~1.8MB, so storing each cell's data requires \(1/11\)th
        of a byte per iteration (amortized).
        """
        def __init__(self, model, accept=always(), statistics={}, steps=10000):
                """
                Args:
                        model (Model): A Model (e.g. `ateams.models.SwendsenWang`)
                                generating the Markov chain.
                        accept (Callable): A function that consumes the complex, model, and
                                state to determine whether we're going to a good place.
                        statistics (dict): A mapping of names to functions which take the complex,
                                model, and state as argument. The Chain keeps track of these at
                                each iteration and stores whatever output is given.
                        steps (int): The number of iterations in the chain.
                """
                self.model = model
                self.steps = steps
                self.accept = accept
                self._exitcode = 0
                self._warnings = 0

                # Store stats and things.
                self.functions = statistics
                self.statistics = { name: [] for name in self.functions.keys() }


        def __iter__(self):
                """
                Initializes the Chain object as a generator.
                """
                self.step = 0
                self.state = tuple([self.model.spins] + []*(self.model._returns-1))
                return self
        

        def __next__(self):
                """
                Performs the computations specified by the proposal and acceptance schemes.
                """
                # While we haven't reached the max number of steps, propose a new plan,
                # check whether it's acceptable/valid, and continue.
                while self.step < self.steps:
                        # Propose the next state and check whether we want to accept it as
                        # the next state or not; assign whichever state is chosen to the
                        # Model.
                        try: proposed = self.model._proposal(self.step)
                        except NumericalInstabilityWarning:
                                self._exitcode = 2
                                self._warnings += 1
                                proposed = self.state

                        self.state = proposed if self.accept(self.state, proposed, self.step) else self.state
                        self.model._assign(self.state[0])

                        # Compute statistics.
                        for name, function in self.functions.items():
                                self.statistics[name].append(function(self.model, self.state))
                        
                        # Iterate.
                        self.step += 1
                        
                        return self.state
                
                raise StopIteration
        
        
        def progress(self, dynamic_ncols=True, desc=""):
                """
                Progress bar.

                Returns:
                        `tqdm` iterable.
                """
                from tqdm.auto import tqdm
                import sys

                # stream = sys.stdout if self.model._DEBUG else sys.stderr
                stream = sys.stderr
                return tqdm(self, file=stream, total=self.steps, dynamic_ncols=dynamic_ncols, desc=desc)

Methods

def progress(self, dynamic_ncols=True, desc='')

Progress bar.

Returns

tqdm iterable.

class Player

Safely "replays" recorded Chain output.

Expand source code
class Player():
        """
        Safely "replays" recorded `Chain` output.
        """
        def __init__(self): pass

        def playback(self, fp:str, steps=1000, dtypes=(int, int, int)):
                """
                Initialize playback; context management.

                Args:
                        fp (str): File in which records are stored.
                        steps (int=1000): How many steps in the chain? Only relevant for
                                displaying the progress bar.
                        dtypes (tuple=(int, int, int)): NumPy dtypes for reporting data.

                Returns:
                        This Player object.
                """
                # Configure filepath, reader.
                self._fp = fp
                self._reader = lz4.frame.LZ4FrameFile(f"{self._fp}", mode="rb")
                self._dtypes = dtypes

                # Similar setup to the Recorder.
                self._current = None
                self._currentized = False

                self._loaded = []
                self._remaining = 0
                self._blocksize = 0
                self._configurationsize = -1

                # Compression markers.
                self._intrablockbreak = '<<#>>'
                self._interblockbreak = '<<<##>>>'
                self._interlistbreak = '<>'
                self._listsep = ' '

                # Number of steps (progress bar only).
                self._steps = steps

                # Enter context management.
                return self.__enter__()
        

        def __iter__(self):
                """
                This `Player` is a generator which yields states of the recorded chain.
                """
                return self


        def __next__(self):
                """
                Iteration magic method.
                """
                # If there are no more configurations remaining in this block, load the
                # next block.
                if self._remaining == 0:
                        # Check whether we're EOF.
                        try:
                                unconfigured = self._reader.readline()
                                assert unconfigured != b''
                        except:
                                raise StopIteration
                        
                        # Split on block markers...
                        unpackedlines = unconfigured.decode().strip().split(self._interblockbreak)

                        # ... then on intra-block markers (indicating iterations) ...
                        unpackediterations = [l.split(self._intrablockbreak) for l in unpackedlines]

                        # ... then on intra-list markers (indicating different arrays within each iteration)...
                        self._loaded = [[s.split(self._interlistbreak) for s in l] for l in unpackediterations]
                        self._loaded = [
                                [
                                        [np.fromstring(i, sep=self._listsep, dtype=int), np.fromstring(v, sep=self._listsep, dtype=int)]
                                        for i, v in iteration
                                ]
                                for iteration in self._loaded   
                        ]

                        # Compute the number of configurations that need to be reported and
                        # the size of the currently-loaded block (in iterations).
                        self._remaining = len(self._loaded)
                        self._blocksize = len(self._loaded)

                        # Check if we need to load an initial configuration; this only
                        # happens on the first step.
                        if not self._currentized:
                                self._current = tuple([self._loaded[0][i][1] for i in range(len(self._loaded[0]))])
                                self._configurationsize = len(self._current)
                                self._currentized = True

                # Load the next configuration, modify the current one.
                nextup = self._loaded[self._blocksize-self._remaining]

                for i in range(self._configurationsize):
                        indices, values = nextup[i]
                        values = values.astype(self._dtypes[i])
                        self._current[i][indices] = values
                
                # Decrement the number of configurations remaining in the block, and
                # return the current(ly modified) configuration.
                self._remaining -= 1

                return self._current

        def __enter__(self):
                """
                Required context management magic method.
                """
                return self


        def __exit__(self, exc_type, exc_value, exc_tb):
                """
                Required context management magic method; kills the reader and the file.
                """
                self._reader.close()

        
        def progress(self):
                from tqdm.auto import tqdm
                return tqdm(self, total=self._steps)

Methods

def playback(self,
fp: str,
steps=1000,
dtypes=(<class 'int'>, <class 'int'>, <class 'int'>))

Initialize playback; context management.

Args

fp : str
File in which records are stored.
steps : int=1000
How many steps in the chain? Only relevant for displaying the progress bar.

dtypes (tuple=(int, int, int)): NumPy dtypes for reporting data.

Returns

This Player object.

def progress(self)
class Recorder

Safely "records" states from a Chain by tracking changes between successive states, and writing these changes — alongside the initial state — to file. WARNING: all input is flattened when recording and is NOT reshaped when playing back.

Expand source code
class Recorder:
        """
        Safely "records" states from a Chain by tracking changes between successive
        states, and writing these changes --- alongside the initial state --- to
        file. WARNING: all input is flattened when recording and is NOT reshaped when
        playing back.
        """
        def __init__(self): pass

        def record(self, fp: str, blocksize=100):
                """
                Called to configure the recording apparatus for the Chain, and *should*
                be used as a context manager (i.e. with the `with` statement).

                Args:
                        fp (str): Filename.
                        block (int=100): Number of states to bundle together before compressing. 

                Returns:
                        This `Recorder` object.

                
                """
                # Configure filepath, writer.
                self._fp = fp
                self._writer = lz4.frame.LZ4FrameFile(f"{self._fp}", mode="wb", compression_level=9)
                
                # Save the chain for access during iteration; set the "previous" state to
                # be all -1s, since the first state yielded by iterating over the Chain
                # is the initial state, and we need to record the whole thing; fix a list
                # of possible integer states for later.
                self.previous = None
                self._previous = False

                self._blocksize = blocksize
                self._block = []
                self._stored = 0

                # Compression markers.
                self._intrablockbreak = '<<#>>'
                self._interblockbreak = '<<<##>>>'
                self._interlistbreak = '<>'
                
                # Enter the context manager.
                return self.__enter__()


        def store(self, state) -> None:
                """
                Stores a state yielded by iteration over a `Chain`.

                Args:
                        state (tuple): Tuple of NumPy arrays to be written to file.
                """
                # Check whether we've received any arguments before; if not, initialize
                # a list to keep track of "previous" arguments. We expect the arguments
                # here to be numpy arrays.

                # Check whether we have any previously-stored data.
                if not self._previous:
                        self.previous = tuple(-np.ones(shape=s.flatten().shape, dtype=s.dtype) for s in state)
                        self._previous = True

                subblock = []
                
                # Get diffs; store.
                for i in range(len(state)):
                        # Compute the diff and store.
                        curr = state[i].flatten()
                        diff = np.flatnonzero(~np.equal(curr, self.previous[i]))
                        self.previous[i][diff] = curr[diff][::]

                        # Add the encoded dictionary of diffs to the subblock.
                        subblock.append(f"{' '.join(diff.astype(str))}{self._interlistbreak}{' '.join(curr[diff].astype(str))}")

                # Increment the number of iterations we've stored; if we've reached the
                # limit, compress, write to file, and begin again.
                self._block.append(subblock)
                self._stored += 1

                if self._stored == self._blocksize:
                        self._writer.write(
                                (self._interblockbreak.join(
                                        self._intrablockbreak.join(subblock)
                                        for subblock in self._block
                                ) + "\n").encode()
                        )

                        # Flush the block/#stored.
                        self._block = []
                        self._stored = 0

        
        def __enter__(self):
                """
                Required context management magic method.
                """
                return self


        def __exit__(self, exc_type, exc_value, exc_tb):
                """
                Required context management magic method: writes what's left of the cache
                to file, then closes the writer.
                """
                # If we've finished storing but there's still stuff left in the cache,
                # write that to file too.
                if len(self._block):
                        self._writer.write(
                                (self._interblockbreak.join(
                                        self._intrablockbreak.join(subblock)
                                        for subblock in self._block
                                ) + "\n").encode()
                        )
                
                self._writer.close()

Methods

def record(self, fp: str, blocksize=100)

Called to configure the recording apparatus for the Chain, and should be used as a context manager (i.e. with the with statement).

Args

fp : str
Filename.
block : int=100
Number of states to bundle together before compressing.

Returns

This Recorder object.

def store(self, state) ‑> None

Stores a state yielded by iteration over a Chain.

Args

state : tuple
Tuple of NumPy arrays to be written to file.