import numpy as np
from pyimzml.ImzMLParser import ImzMLParser, getionimage
from joblib import Parallel, delayed
from functools import partial
import anndata as ad
import pandas as pd
import geopandas as gpd
from shapely.geometry import box
import numpy as np
from anndata import AnnData
from spatialdata import SpatialData
from spatialdata.models import PointsModel, Image2DModel, TableModel, ShapesModel
from spatialdata.transformations import Identity
import pkg_resources
import os
[docs]
def parmap(f, X, nprocs=None):
"""
Parallel map using joblib (more robust for Jupyter).
Parameters
----------
f : callable
Function to apply to each element
X : iterable
Input data
nprocs : int, optional
Number of processes (default: -1, all CPUs)
Returns
-------
list
Results in same order as input
"""
if nprocs is None:
nprocs = -1 # Use all CPUs
return Parallel(n_jobs=nprocs, backend='loky')(
delayed(f)(x) for x in X
)
[docs]
def getimage(peak, path, tol = 0.1):
p = ImzMLParser(path) #individual file pointers otherwise parsing is corrupted
return getionimage(p, peak, tol=tol, reduce_func=max)
[docs]
def rd_peaks(fn):
data = []
with open(fn) as f:
f.readline() #header
for line in f:
ss = line.split()
if ss[0].strip('"') == 'M': continue
data.append(float(ss[1]))
return data
[docs]
def rd_peaks_from_package():
# Try to get the file from the package
peaks_path = pkg_resources.resource_filename('goatpy', 'data/PEAKS.csv')
if not os.path.exists(peaks_path):
raise FileNotFoundError(f"PEAKS.csv not found at {peaks_path}")
with open(peaks_path, 'r') as f:
data = []
f.readline() # skip header
for line in f:
ss = line.split()
if ss[0].strip('"') == 'M':
continue
data.append(float(ss[1]))
return data
[docs]
def glyco_spatialdata(imzml_path, peaks_path = None, tol = 0.1, pixel_size = 20):
# Load Peaks
if peaks_path is None:
peaks = rd_peaks_from_package()
else:
peaks = rd_peaks(peaks_path)
# Load ImzML data
getimg = partial(getimage, path=imzml_path, tol = tol)
spectra_all = np.stack(
parmap(getimg, peaks, 10),
axis=-1
)
# Load Spatial Info
p = ImzMLParser(imzml_path)
coords = np.array(p.coordinates)[:, :2] # (x, y)
coords = coords - 1 # convert from 1-based to 0-based indexing
# Create AnnData Object
spectra_flat = np.array([spectra_all[y-1, x-1, :] for x, y in coords])
anndata = ad.AnnData(spectra_flat, dtype=np.float32)
anndata.var_names = np.array(["%.1f" % p for p in peaks])
anndata.obs_names = np.array(list(map(str, range(len(coords)))))
anndata.obs["full_x"] = coords[:, 0]
anndata.obs["full_y"] = coords[:, 1]
anndata.obs["x"] = anndata.obs["full_x"] - anndata.obs["full_x"].min()
anndata.obs["y"] = anndata.obs["full_y"] - anndata.obs["full_y"].min()
anndata.obsm["spatial"] = np.column_stack([anndata.obs["x"], anndata.obs["y"]])
# Calculate Total Ion Count (TIC)
anndata.obs["MPI"] = np.ravel(anndata.X.sum(axis=1))
# Create SpatialData Object
coords = pd.DataFrame({
"x": [c for c in anndata.obs["x"]],
"y": [c for c in anndata.obs["y"]],
})
coords["instance_id"] = coords.index.astype(str) # unique ID for each pixel
coords["region"] = "pixels" # must exist for TableModel
df = pd.concat(
[
coords.reset_index(drop=True),
pd.DataFrame(anndata.X, columns=("mz-" + anndata.var.index))
],
axis=1
)
points = PointsModel.parse(df)
gdf = centroids_to_pixel_squares(df, x_col="x", y_col="y", pixel_size=1.0)
shapes = ShapesModel.parse(
gdf[["instance_id", "geometry"]],
transformations={"global": Identity()},
)
sdata = SpatialData(points={"centroids": points},
shapes={"pixels": shapes})
adata = AnnData(
X=anndata.X,
obs=coords, # contains x, y, point_id, region
var=pd.DataFrame(index=("mz-" + anndata.var.index))
)
adata.obs = pd.concat(
[
adata.obs.reset_index(drop=True),
anndata.obs.drop(columns=["x", "y"]).iloc[:adata.n_obs].reset_index(drop=True)
],
axis=1
)
coords = np.array(adata.obs[["x", "y"]])
adata.obsm["spatial"] = coords
adata.obs["instance_id"] = sdata["pixels"].index
adata.obs["region"] = "pixels"
adata.obs["region"].astype("category")
table = TableModel.parse(
adata,
region="pixels", # name of your PointsModel
region_key="region", # must exist in adata.obs
instance_key="instance_id" # unique per row
)
# --- 7. Add to SpatialData ---
sdata.tables["maldi_adata"] = table
return sdata
[docs]
def centroids_to_pixel_squares(df, x_col="x", y_col="y", pixel_size=1.0):
half = pixel_size / 2
geometries = [
box(x - half, y - half, x + half, y + half)
for x, y in zip(df[x_col], df[y_col])
]
gdf = gpd.GeoDataFrame(
df.copy(),
geometry=geometries,
)
return gdf