前言
- 本文尝试粗浅了解scanpy模块在空间转录组分析过程中,简单的标准流程以及数据在源码的调用过程中进行的传递,以及封装下的底层函数的调用对象。
- 本次解读,只涉及到scanpy源码,外部模块留待之后进行解读。
1. 引入相关模块
import scanpy as sc
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from matplotlib import rcParams
import seaborn as sb
import SpatialDE
import os,sys
2. 准备数据
下载数据:"V1_Breast_Cancer_Block_A_Section_1" 该文件大家可以根据文件名在10×官方数据库自行下载。
# 路径下文件:
'''
['filtered_feature_bc_matrix.h5',
'pnas.1912459116.sd12.csv',
'pnas.1912459116.sd12_1.csv',
'pnas.1912459116.sd15.xlsx.xlsx',
'spatial']
'''
### 或者使用scanpy 的 datasets.visium_sge 函数在线进行下载,返回anndata数据结果
adata = sc.datasets.visium_sge(sample_id="V1_Breast_Cancer_Block_A_Section_1")
3.相关环境设置
plt.rcParams['figure.figsize']=(8,8) # 设置matplotlib图片大小 其中 单位为英寸。
sc.settings.verbosity = 3 # verbosity 的取值表示测试结果显示的详细程度,数字越大越详细, errors (0), warnings (1), info (2), hints (3)
sc.logging.print_versions() # 输出版本号
sc.settings.set_figure_params(dpi=80) # set_figure_params 设置图片的分辨率,dpi表示1英寸包含80个像素
4. 读取数据
adata = sc.read_visium("V1_Breast_Cancer_Block_A_Section_1")
####### 我们先来了解一下adata的数据结构。
AnnData object with n_obs × n_vars = 3813 × 33538
obs: 'in_tissue', 'array_row', 'array_col'
var: 'gene_ids', 'feature_types', 'genome'
uns: 'spatial'
obsm: 'spatial'
# 具体属性如下所示:
print(adata.X) # adata.X是count数据,为稀疏矩阵。
(0, 33508) 33.0
(0, 33506) 7.0
(0, 33505) 49.0
(0, 33504) 1.0
adata.var # adata.var 为基因信息,数据框。行名为gene name。
gene_ids feature_types genome
MIR1302-2HG ENSG00000243485 Gene Expression GRCh38
FAM138A ENSG00000237613 Gene Expression GRCh38
OR4F5 ENSG00000186092 Gene Expression GRCh38
AL627309.1 ENSG00000238009 Gene Expression GRCh38
adata.osb # adata.obs 为细胞信息,数据框,行名为barcode,即我们认为的细胞
in_tissue array_row array_col
AAACAAGTATCTCCCA-1 1 50 102
AAACACCAATAACTGC-1 1 59 19
AAACAGAGCGACTCCT-1 1 14 94
AAACAGGGTCTATATT-1 1 47 13
AAACAGTGTTCCTGGG-1 1 73 43
adata.uns['spatial']['V1_Breast_Cancer_Block_A_Section_1'].keys() # adata.uns 为非结构,主要是位置信息和参数信息,字典,key 值如下:
'''
dict_keys(['images', 'scalefactors', 'metadata'])
'''
adata.obsm # 为obs补充信息,多维矩阵。
# 在后面的分析过程中,我们会在adata的基础上添加更多的信息,如下:
adata.varm # 与obsm相似,是gene的信息补充。
adata.obsp # 可以理解为对obs做的cp,记录配对信息。
adata.layers # adata 空间信息保存对象。
# 查看数据情况,我们可以使用以下方式或命令进行。
adata.n_obs adata.n_vars adata.shape # 等方式进行数量统计
adata.obs_keys()# 方法返回注释信息
adata.obs_names adata.obs.head()# 返回细胞信息
adata.var.index adata.var_names.to_list()# 返回基因信息。
# 查看 read_visium 的函数参数设置。
def read_visium(
path: Union[str, Path],
genome: Optional[str] = None, # 过滤在该基因组中的基因的表达。
*,
count_file: str = "filtered_feature_bc_matrix.h5", # 默认的count文件名,
library_id: str = None,
load_images: Optional[bool] = True, # 默认读取图像信息
source_image_path: Optional[Union[str, Path]] = None,
) -> AnnData:
- count数据实际由read_10x_h5函数读取"filtered_feature_bc_matrix.h5"文件,如果名称有改动,可用 count_file 参数进行指定。返回的adata为 AnnData 格式。
adata = read_10x_h5(path / count_file, genome=genome)
# --------------------
def read_10x_h5(
) -> AnnData:
- 函数通过imread函数读取路径下spatial内png文件内容。
if load_images:
files = dict(
tissue_positions_file=path / 'spatial/tissue_positions_list.csv',
scalefactors_json_file=path / 'spatial/scalefactors_json.json', # 保存了缩放比例的参数信息。
hires_image=path / 'spatial/tissue_hires_image.png',
lowres_image=path / 'spatial/tissue_lowres_image.png',
)
# ----------------------------
for res in ['hires', 'lowres']:
try:
adata.uns["spatial"][library_id]['images'][res] = imread(
str(files[f'{res}_image'])
)
- obs和obsm数据来源于'spatial/tissue_positions_list.csv' 文件。
positions = pd.read_csv(files['tissue_positions_file'], header=None)
# 该文件一共6列,其中第 6,5 列是我们下面要用到的['pxl_row_in_fullres', 'pxl_col_in_fullres']
# ---------------------------
adata.obs = adata.obs.join(positions, how="left")
adata.obsm['spatial'] = adata.obs[
['pxl_row_in_fullres', 'pxl_col_in_fullres']
].to_numpy() # obsm 保留了空间位置的(n_obs,2)形状的矩阵
adata.obs.drop(
columns=['barcode', 'pxl_row_in_fullres', 'pxl_col_in_fullres'],
inplace=True,
)
- 到目前为止,我们没有发现 data.var 的信息,推测应该是在read_10x_h5 函数中进行了定义。
adata = read_10x_h5(path / count_file, genome=genome)
def read_10x_h5(
filename: Union[str, Path],
genome: Optional[str] = None,
gex_only: bool = True, # 默认只选择了有表达量的feature。
backup_url: Optional[str] = None, # 支持利用url进行文件检索
) -> AnnData:
# -------------------
with h5py.File(str(filename), 'r') as f:
adata = _read_v3_10x_h5(filename, start=start)
- 可见read_10×__h5 函数同样读取h5文件,进而,调用了_read_v3_10x_h5 进行文件读取。
# adata 的构造函数。-------------------
adata = AnnData(
matrix,
obs=dict(obs_names=dsets['barcodes'].astype(str)), # 利用barcode作为obs的行名在此进行定义,后期 .join 方法进行数据合并,如上:
var=dict(
var_names=dsets['name'].astype(str),
gene_ids=dsets['id'].astype(str),
feature_types=dsets['feature_type'].astype(str),
genome=dsets['genome'].astype(str),
),
)
# 上溯dset来源,解释adata.X稀疏矩阵的构造方法。
_collect_datasets(dsets, f["matrix"]) # f['matrix']中与h5py.Dataset 同类型的元素放入 dsets 字典中
M, N = dsets['shape']
data = dsets['data']
matrix = csr_matrix(
(data, dsets['indices'], dsets['indptr']),
shape=(N, M),
) # 以稀疏矩阵的形式组织data。
- adata中data 和var的提取,(即dset)来源于h5py包的File函数对h5文件的读取。
此处简单了解一下csr_matrix函数:如下,csr_matrix 函数有三个主要参数,indptr 对indice 和 data 进行切分。如下实例,表示,第一行有2-0=2个数,第二行 5-2=3个数,第三行 2个数,所以将 indice 和data 切分为三部分,个数为 2,3,2 ,那么第一行的数是data中的前两个数,[1,1],位置在indic前两个数[1,10],然后发现有shape规定了行数和列数,其他的位置用0填充,第二行和第三行依次类推。最后的matrix如下所示。
indptr:[0, 2 ,5, 7]
indice:[1, 10, 1 ,2 ,11, 0, 1]
data:[1,1,1,1,1,1,1]
matrix:
[0,1,0,0,0,0,0,0,0,0,1,0]
[0,1,1,0,0,0,0,0,0,0,0,1]
[1,1,0,0,0,0,0,0,0,0,0,0]
通过以上信息,我们大概了解了scanpy对空间转录组数据的读取,同时发现一个滞留问题,我们需要详细了解h5py模块File是如何对h5文件进行读取的,留待之后有时间再交流。
5. 索引去重。
adata.var_names_make_unique()
- 数据读取中"var_names=dsets['name'].astype(str)"默认中使用name,如果用的是ID,则不需要进行去重操作。
def var_names_make_unique(self, join: str = "-"):
...
self.var_names = utils.make_index_unique(self.var.index, join)
# -------------------
def make_index_unique: # 继续分析
...
for i, v in enumerate(values_dup):
while True:
counter[v] += 1 # counter[v] 中记录了v第几次出现。
tentative_new_name = v + join + str(counter[v])
# 可见,针对重复的index,在原有的值后面添加数字以形成新的值进行替换,所以此处我们发现,feature的个数并没有进行改变。
6. 画图描述数据情况。
sc.pl.highest_expr_genes(adata,n_top=20,save="highest_expr_genes.pdf")
- 绘制表达最高的若干个基因。
# highest_expr_genes 函数首先对数据进行了归一化
norm_dict = normalize_total(adata, target_sum=100, inplace=False) # 函数可以对每个细胞进行归一化,以便每个细胞在归一化后沿着基因方向求和具有相同的总数
def normalize_total(
adata: AnnData,
target_sum: Optional[float] = None, #如果“无”,则归一化后,每个观测值(细胞)的总计数等于归一化前观测值(cells)总计数的中值。
exclude_highly_expressed: bool = False,
max_fraction: float = 0.05,
key_added: Optional[str] = None,
layer: Optional[str] = None,
layers: Union[Literal['all'], Iterable[str]] = None,
layer_norm: Optional[str] = None,
inplace: bool = True, # 是更新 adata 还是返回带有 adata 规范化副本的字典。data.X和 adata.layers,此处调用该函数,inplace = False,所以返回的adata.X 未发生改变。
copy: bool = False,
) -> Optional[Dict[str, np.ndarray]]:
# --------------------------------------------
counts_per_cell = X.sum(1)
# ----------
mean_percent = norm_dict['X'].mean(axis=0).A1
top_idx = np.argsort(mean_percent)[::-1][:n_top]
counts_top_genes = norm_dict['X'][:, top_idx].A # 所以选择的基因其实是归一化之后基因count值最大的若个个基因。
# 设置绘图参数。
height = (n_top * 0.2) + 1.5
fig, ax = plt.subplots(figsize=(5, height))
sns.boxplot(data=counts_top_genes, orient='h', ax=ax, fliersize=1, **kwds) ## 归一化后的数据,使用 seaborn.boxplot函数绘制箱线图。
ax.set_xlabel('% of total counts')
_utils.savefig_or_show('highest_expr_genes', show=show, save=save)
7. 数据预处理
adata.var['mt'] = adata.var_names.str.startswith('MT-') # 标记线粒体基因
adata.obs['mt_frac'] = np.sum(adata[:,adata.var['mt']].X,axis=1).A1/np.sum(adata.X,axis=1).A1
adata.obs['total_counts'] = adata.X.sum(axis=1).A1 # 统计全部的基因。
# 绘制数据处理前数据状况
fig, axs = plt.subplots(1,2, figsize=(15,4)) # axs 存储生成的ax对象。
fig.suptitle('Covariates for filtering')
sb.distplot(adata.obs['total_counts'], kde=False, ax = axs[0]) # seaborn.distplot函数绘制频率分布直方图。
sb.distplot(adata.obs['total_counts'][adata.obs['total_counts']<10000], kde=False, bins=40, ax = axs[1])
plt.savefig("./figures/displot_befor.pdf") # 绘图生成total_count 的频率分布直方图,理想状态下,其应该是正太分布。
# 细胞过滤
sc.pp.filter_cells(adata, min_counts = 5000,max_counts = 35000,min_genes = 3000,max_genes = 20000) # 过滤细胞
adata = adata[adata.obs['mt_frac'] < 0.2] # adata 以bool数组的形式进行过滤。
# 基因过滤。
sc.pp.filter_genes(adata, min_cells=10)
# 绘制数据处理后数据状况
#####################代码同上。
# 两个过滤函数参数及实现方式如下所示:
def filter_cells(
data: AnnData,
min_counts: Optional[int] = None, # cell中表达的最小 count值
min_genes: Optional[int] = None, # 最少 gene个数
max_counts: Optional[int] = None, # 最大 count值
max_genes: Optional[int] = None, # 最多 gene个数
inplace: bool = True,
copy: bool = False,
) -> Optional[Tuple[np.ndarray, np.ndarray]]:
number_per_cell = np.sum(
X if min_genes is None and max_genes is None else X > 0, axis=1
)
if min_number is not None:
cell_subset = number_per_cell >= min_number
if max_number is not None:
cell_subset = number_per_cell <= max_number
# -----------------------------------------------
def filter_genes(
data: AnnData,
min_counts: Optional[int] = None, # gene被检测到的最小的count
min_cells: Optional[int] = None, # gene被检测到的最少cell
max_counts: Optional[int] = None,
max_cells: Optional[int] = None,
inplace: bool = True,
copy: bool = False,
) -> Union[AnnData, None, Tuple[np.ndarray, np.ndarray]]:
number_per_gene = np.sum(
X if min_cells is None and max_cells is None else X > 0, axis=0
)
if min_number is not None:
gene_subset = number_per_gene >= min_number
if max_number is not None:
gene_subset = number_per_gene <= max_number
8. 数据标准化
sc.pp.normalize_total(adata, inplace = True) # 数据又进行了一次归一化,不同的是,这次的归一化反映到了数据本身,之前的归一化结果只是用来进行绘图。
sc.pp.log1p(adata)
def log1p:
return log1p_array(X, copy=copy, base=base) # 根据参数base,又两种标准化方法。
# ---------------------------------------------------
np.log1p(X, out=X) # 平滑处理,数据更加符合高斯分布。log1p = log(x+1) 逆运算 expm1
if base is not None:
np.divide(X, np.log(base), out=X)
# np.log(base) 以E 为底的base的对数。np.log10().以 10 为底。
# np.divide实现除法运算,X//np.log(base)
9. 判断高变基因。
sc.pp.highly_variable_genes(adata, flavor='seurat', n_top_genes=2000, inplace=True)
# 首先我们可以从adata的变化,了解到adata.var 多出的几列数据:['highly_variable', 'means', 'dispersions', 'dispersions_norm']
def highly_variable_genes(
adata: AnnData,
layer: Optional[str] = None, # 是否替换adata.x进行计算。
n_top_genes: Optional[int] = None,
min_disp: Optional[float] = 0.5,
max_disp: Optional[float] = np.inf,
min_mean: Optional[float] = 0.0125,
max_mean: Optional[float] = 3,
span: Optional[float] = 0.3,
n_bins: int = 20,
flavor: Literal['seurat', 'cell_ranger', 'seurat_v3'] = 'seurat',
subset: bool = False,
inplace: bool = True,
batch_key: Optional[str] = None, # 批次的标记键值。
check_values: bool = True,
) -> Optional[pd.DataFrame]:
# 单批次样本的处理函数
df = _highly_variable_genes_single_batch(
........
)
if flavor == 'seurat':
if 'log1p' in adata.uns_keys() and adata.uns['log1p']['base'] is not None:
X *= np.log(adata.uns['log1p']['base'])
X = np.expm1(X) # 我们发现此时的X数据,被重新逆运算为了归一化后的数据。
mean[mean == 0] = 1e-12 # 设置0值为足够小的数据,避免mean值作为除数时的报错。
dispersion = var / mean
if flavor == 'seurat': # logarithmized mean as in Seurat
dispersion[dispersion == 0] = np.nan
dispersion = np.log(dispersion)
mean = np.log1p(mean) # 根据mean和var进行离散度的计算。
# -------------------------------------此处补充一个编程技巧,如何遍历稀疏矩阵,然后计算均值和方差。
means = np.zeros(major_len, dtype=dtype)
variances = np.zeros_like(means, dtype=dtype)
for i in range(major_len):
startptr = indptr[i]
endptr = indptr[i + 1]
counts = endptr - startptr
for j in range(startptr, endptr):
means[i] += data[j]
means[i] /= minor_len
for j in range(startptr, endptr):
diff = data[j] - means[i]
variances[i] += diff * diff
variances[i] += (minor_len - counts) * means[i] ** 2
variances[i] /= minor_len
return means, variances
# ---------------------- # 如何切分数据,根据离散度获取高变基因。
df['mean_bin'] = pd.cut(df['means'], bins=n_bins)
# pd.cut 是根据bins对数据进行分组的函数,如果bins是正整数,则把数据等分成n_bins份。如果是序列,则序列中的数据,就是分区的边界临界值。
# 根据mean值进行切分。此时根据n_bins将数据区间等分,返回第一个参数所在的区间,比如:
# pd.cut(np.array([2,6,4,8,1,5,9]),bins=[1,4,7,10]) 返回:
# [(1.0, 4.0], (4.0, 7.0], (1.0, 4.0], (7.0, 10.0], NaN, (4.0, 7.0], (7.0, 10.0]],表示 2的区间是(1.0, 4.0]。
disp_grouped = df.groupby('mean_bin')['dispersions']# 首先根据均值进行了分组,然后对每一组的离散度进行后面均值和标准差的计算。
disp_mean_bin = disp_grouped.mean() # 单组中的均值
disp_std_bin = disp_grouped.std(ddof=1) # 单组中的标准差
10. 降维(主成分分析)
sc.pp.pca(adata, n_comps=50, use_highly_variable=True, svd_solver='arpack')
sc.pp.neighbors(adata)
def pca(
data: Union[AnnData, np.ndarray, spmatrix],
n_comps: Optional[int] = None,
zero_center: Optional[bool] = True,
svd_solver: str = 'arpack',
random_state: AnyRandom = 0,
return_info: bool = False,
use_highly_variable: Optional[bool] = None, # 是否使用高变基因进行主成分分析
dtype: str = 'float32',
copy: bool = False,
chunked: bool = False,
chunk_size: Optional[int] = None,
) -> Union[AnnData, np.ndarray, spmatrix]:
# ----------------------------------
if use_highly_variable is None: # 如果没有定义使用高边基因,但是其实如果进行高边基因的定义,此时同样使用了高边基因进行计算,并不像函数定义时,默认不适用高边基因。
use_highly_variable = True if 'highly_variable' in adata.var.keys() else False
X = adata_comp.X
from sklearn.decomposition import PCA # 使用sklearn模块中的 PCA函数进行主成分分析。
output = _pca_with_sparse(
X, n_comps, solver=svd_solver, random_state=random_state
)
# -------------------------------------------------------------------------
def neighbors(
adata: AnnData,
n_neighbors: int = 15,
n_pcs: Optional[int] = None,
use_rep: Optional[str] = None,
knn: bool = True,
random_state: AnyRandom = 0,
method: Optional[_Method] = 'umap',
metric: Union[_Metric, _MetricFn] = 'euclidean',
metric_kwds: Mapping[str, Any] = MappingProxyType({}),
key_added: Optional[str] = None,
copy: bool = False,
) -> Optional[AnnData]:
neighbors = Neighbors(adata)
neighbors.compute_neighbors(
n_neighbors=n_neighbors,
knn=knn,
n_pcs=n_pcs,
use_rep=use_rep,
method=method,
metric=metric,
metric_kwds=metric_kwds,
random_state=random_state,
)
# -----------------------------------------------
X = _choose_representation(self._adata, use_rep=use_rep, n_pcs=n_pcs)
X = adata.obsm['X_pca'][:, :n_pcs] # 进行距离计算,使用的是pca的结果数据。
# -------------------------------------------
from sklearn.metrics import pairwise_distances
_distances = pairwise_distances(X, metric=metric, **metric_kwds) # 利用 sklearn 中pairwise_distances 计算距离矩阵。
knn_indices, knn_distances = _get_indices_distances_from_dense_matrix(
_distances, n_neighbors
)
def _get_indices_distances_from_dense_matrix(D, n_neighbors: int):
sample_range = np.arange(D.shape[0])[:, None] # 添加行的数字,写成二维数组。
indices = np.argpartition(D, n_neighbors - 1, axis=1)[:, :n_neighbors] # 比第 n_neighbors 个数字小的放它前面,大的放后面,主要用于粗放的排序,将数组分为两个类群。
indices = indices[sample_range, np.argsort(D[sample_range, indices])]
distances = D[sample_range, indices] # 最后其实就是选中了每个样本最近的n_neighbor个样本距离。
return indices, distances
11. 降维聚类(umap)
sc.tl.umap(adata)
def umap(
adata: AnnData,
min_dist: float = 0.5,
spread: float = 1.0,
n_components: int = 2,
maxiter: Optional[int] = None,
alpha: float = 1.0,
gamma: float = 1.0,
negative_sample_rate: int = 5,
init_pos: Union[_InitPos, np.ndarray, None] = 'spectral',
random_state: AnyRandom = 0,
a: Optional[float] = None,
b: Optional[float] = None,
copy: bool = False,
method: Literal['umap', 'rapids'] = 'umap',
neighbors_key: Optional[str] = None,
) -> Optional[AnnData]:
# ----------------------------------------------------------------------------
init_coords = 'spectral' # umap参数。
X = _choose_representation(
adata,
neigh_params.get('use_rep', None),
neigh_params.get('n_pcs', None),
silent=True,
)
X_umap = simplicial_set_embedding(
X,
neighbors['connectivities'].tocoo(),
n_components,
alpha,
a,
b,
gamma,
negative_sample_rate,
n_epochs,
init_coords,
random_state,
neigh_params.get('metric', 'euclidean'),
neigh_params.get('metric_kwds', {}),
verbose=settings.verbosity > 3,
)
def simplicial_set_embedding(*args, **kwargs):
from umap.umap_ import simplicial_set_embedding
X_umap, _ = simplicial_set_embedding( # 使用的是umap包中simplicial_set_embedding 函数进行计算。
*args,
densmap=False,
densmap_kwds={},
output_dens=False,
**kwargs,
)
return X_umap
12. 降维聚类(leiden)
sc.tl.leiden(adata, key_added='clusters') # key_added: adata.obs: key under which to add the cluster labels.
def leiden(
adata: AnnData,
resolution: float = 1,
*,
restrict_to: Optional[Tuple[str, Sequence[str]]] = None,
random_state: _utils.AnyRandom = 0,
key_added: str = 'leiden',
adjacency: Optional[sparse.spmatrix] = None,
directed: bool = True,
use_weights: bool = True,
n_iterations: int = -1,
partition_type: Optional[Type[MutableVertexPartition]] = None,
neighbors_key: Optional[str] = None,
obsp: Optional[str] = None,
copy: bool = False,
**partition_kwargs,
) -> Optional[AnnData]:
adjacency = _utils._choose_graph(adata, obsp, neighbors_key)
return neighbors['connectivities']
g = _utils.get_igraph_from_adjacency(adjacency, directed=directed)
def get_igraph_from_adjacency(adjacency, directed=None):
import igraph as ig
sources, targets = adjacency.nonzero()
weights = adjacency[sources, targets]
g = ig.Graph(directed=directed)
g.add_vertices(adjacency.shape[0]) # this adds adjacency.shape[0] vertices
g.add_edges(list(zip(sources, targets)))
g.es['weight'] = weights
partition_type = leidenalg.RBConfigurationVertexPartition
part = leidenalg.find_partition(g, partition_type, **partition_kwargs) # 以leidenalg包中find_partition函数进行求解。
groups = np.array(part.membership)
adata.obs[key_added] = pd.Categorical(
values=groups.astype('U'),
categories=natsorted(map(str, np.unique(groups))),
)
13. umap图
sc.pl.umap(adata, color='clusters', palette=sc.pl.palettes.default_20,save="umap_fig.pdf")
def umap(adata, **kwargs) -> Union[Axes, List[Axes], None]:
return embedding(adata, 'umap', **kwargs)
14. 可视化空间信息
sc.pl.spatial(adata, img_key = "hires",color=['total_counts', 'n_genes'],save="spatial_totalcount_ngene.pdf")
sc.pl.spatial(adata, img_key = "hires", color="clusters", size=1.5,save="spatial_cluster.pdf")
def spatial(
adata,
*,
basis: str = "spatial",
img: Union[np.ndarray, None] = None,
img_key: Union[str, None, Empty] = _empty,
library_id: Union[str, Empty] = _empty,
crop_coord: Tuple[int, int, int, int] = None,
alpha_img: float = 1.0,
bw: Optional[bool] = False,
size: float = 1.0,
scale_factor: Optional[float] = None,
spot_size: Optional[float] = None,
na_color: Optional[ColorLike] = None,
show: Optional[bool] = None,
return_fig: Optional[bool] = None,
save: Union[bool, str, None] = None,
**kwargs,
) -> Union[Axes, List[Axes], None]:
library_id, spatial_data = _check_spatial_data(adata.uns, library_id)
img, img_key = _check_img(spatial_data, img, img_key, bw=bw)
spot_size = _check_spot_size(spatial_data, spot_size)
return spatial_data['scalefactors']['spot_diameter_fullres']
scale_factor = _check_scale_factor(spatial_data, img_key=img_key, scale_factor=scale_factor)
return spatial_data['scalefactors'][f"tissue_{img_key}_scalef"]
axs = embedding(
adata,
basis=basis,
scale_factor=scale_factor,
size=circle_radius,
na_color=na_color,
show=False,
save=False,
**kwargs,
)
15. 差异分析。
sc.tl.rank_genes_groups(adata, "clusters", inplace = True) # 获取到cluster的标记性基因。
sc.pl.rank_genes_groups_heatmap(adata, groups = "4", groupby = "clusters", show = False,n_genes=10,save="rank_genes_group_heatmap.pdf") # 热图展示前10个差异基因的表达信息。
def rank_genes_groups(
adata: AnnData,
groupby: str,
use_raw: Optional[bool] = None,
groups: Union[Literal['all'], Iterable[str]] = 'all',
reference: str = 'rest',
n_genes: Optional[int] = None, # 返回的表中显示的基因数。默认为所有基因。
rankby_abs: bool = False,
pts: bool = False,
key_added: Optional[str] = None,
copy: bool = False,
method: _Method = None, # The default method is `'t-test'`,
# 't-test_overestim_var overestimates variance of each group,
# 'wilcoxon' uses Wilcoxon rank-sum,
# 'logreg' uses logistic regression. See [Ntranos18]_,
corr_method: _CorrMethod = 'benjamini-hochberg', # 选择矫正方法。avail_corr = {'benjamini-hochberg', 'bonferroni'}
tie_correct: bool = False,
layer: Optional[str] = None,
**kwds,
) -> Optional[AnnData]:
# --------------------------------------
if 'only_positive' in kwds:
rankby_abs = not kwds.pop('only_positive') # 选择上调差异基因。
test_obj = _RankGenes(adata, groups_order, groupby, reference, use_raw, layer, pts)
test_obj.compute_statistics(
method, corr_method, n_genes_user, rankby_abs, tie_correct, **kwds
)
# --------------------------------- t-test
with np.errstate(invalid="ignore"):
scores, pvals = stats.ttest_ind_from_stats( # python中的ttest利用stats模块中ttest_ind_from_stats函数进行P值计算。
mean1=mean_group,
std1=np.sqrt(var_group),
nobs1=ns_group,
mean2=mean_rest,
std2=np.sqrt(var_rest),
nobs2=ns_rest,
equal_var=False, # Welch's
)
# --------------------------------------- scanpy中的P值矫正。
if pvals is not None:
self.stats[group_name, 'pvals'] = pvals[global_indices]
if corr_method == 'benjamini-hochberg': ### 第一种方式
from statsmodels.stats.multitest import multipletests
pvals[np.isnan(pvals)] = 1
_, pvals_adj, _, _ = multipletests(
pvals, alpha=0.05, method='fdr_bh'
)
elif corr_method == 'bonferroni': ### 第二种方式
pvals_adj = np.minimum(pvals * n_genes, 1.0)
self.stats[group_name, 'pvals_adj'] = pvals_adj[global_indices]
16. 结束语。
通过简单的解析scanpy源码包,大概了解到scanpy在空间转录组分析过程中做了哪些事情,希望小伙伴也能不虚此读。如果大路的分享能帮助到一位小伙伴就是咱莫大的荣幸了!另外欢迎大家微信搜索“豫见生信”一起交流学习。