Source code for pytoda.datasets.tests.test_gene_expression_dataset

"""Testing GeneExpressionDataset with eager backend."""
import os
import unittest

import numpy as np
import pandas as pd
from torch.utils.data import DataLoader

from pytoda.datasets import GeneExpressionDataset
from pytoda.tests.utils import TestFileContent

CONTENT = os.linesep.join(
    [
        'genes,A,C,B,D',
        'sample_3,9.45,4.984,7.016,8.336',
        'sample_2,7.188,0.695,10.34,6.047',
        'sample_1,9.25,6.133,5.047,5.6',
    ]
)
MORE_CONTENT = os.linesep.join(
    [
        'genes,B,C,D,E,F',
        'sample_10,4.918,0.0794,1.605,3.463,10.18',
        'sample_11,3.043,8.56,1.961,0.6226,5.027',
        'sample_12,4.76,1.124,6.06,0.3743,11.05',
        'sample_13,0.626,5.164,4.277,4.414,2.7',
    ]
)


[docs]class TestGeneExpressionDatasetEagerBackend(unittest.TestCase): """Testing GeneExpressionDataset with eager backend."""
[docs] def setUp(self): self.backend = 'eager' print(f'backend is {self.backend}') self.content = CONTENT self.other_content = MORE_CONTENT
[docs] def test___len__(self) -> None: """Test __len__.""" with TestFileContent(self.content) as a_test_file: with TestFileContent(self.other_content) as another_test_file: gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, ) self.assertEqual(len(gene_expression_dataset), 7)
[docs] def test___getitem__(self) -> None: """Test __getitem__.""" with TestFileContent(self.content) as a_test_file: with TestFileContent(self.other_content) as another_test_file: df = pd.concat( [ pd.read_csv(a_test_file.filename, index_col=0), pd.read_csv(another_test_file.filename, index_col=0), ], sort=False, ) gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, ) gene_list = gene_expression_dataset.gene_list mean = df.mean()[gene_list].values std = df.std(ddof=0)[gene_list].values for i, (key, row) in enumerate(df[gene_list].iterrows()): np.testing.assert_almost_equal( gene_expression_dataset[i].numpy(), (row.values - mean) / std, 5 ) np.testing.assert_almost_equal(gene_expression_dataset.mean, mean, 5) np.testing.assert_almost_equal(gene_expression_dataset.std, std, 5) gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, standardize=False, min_max=True, ) minimum = df.min()[gene_list].values maximum = df.max()[gene_list].values diff = maximum - minimum for i, (key, row) in enumerate(df[gene_list].iterrows()): np.testing.assert_almost_equal( gene_expression_dataset[i].numpy(), (row.values - minimum) / diff, 5, ) np.testing.assert_almost_equal(gene_expression_dataset.min, minimum, 5) np.testing.assert_almost_equal(gene_expression_dataset.max, maximum, 5)
[docs] def test_processing_parameters_standardize_reindex(self) -> None: with TestFileContent(self.content) as a_test_file, TestFileContent( self.other_content ) as another_test_file: # feature not in data is filled with zeros feature_list = ['E', 'C', 'D', 'B', 'all_missing'] standard_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, gene_list=feature_list, backend=self.backend, index_col=0, ) self.assertEqual(standard_dataset[0][-1], 0) gene_list = standard_dataset.gene_list df = pd.concat( [ pd.read_csv(a_test_file.filename, index_col=0), pd.read_csv(another_test_file.filename, index_col=0), ], sort=False, ).reindex( columns=gene_list ) # , fill_value=0.0) # scalar scaling (single max and min) flat = df.values.flatten() # allow nan mean_float = np.nanmean(flat) std_float = np.nanstd(flat) for mean, std in [ # scalar [mean_float, std_float], # list length 1 [[mean_float], [std_float]], ]: processing_parameters = { 'mean': mean, 'std': std, } standard_ds = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, gene_list=feature_list, backend=self.backend, index_col=0, standardize=True, min_max=False, processing_parameters=processing_parameters, impute=None, ) # collect flat values ds_1d = np.concatenate([item.numpy() for item in standard_ds]) # allowing/ignoring nan np.testing.assert_almost_equal(np.nanmean(ds_1d), 0) np.testing.assert_almost_equal(np.nanstd(ds_1d), 1) mean_array = df.mean().values std_array = df.std(ddof=0).values # NOTE: numpy and pytoda use ddof of 0, whereas pandas default is 1 for mean, std in [ # list [mean_array.tolist(), std_array.tolist()], # ndarray [mean_array, std_array], ]: processing_parameters = { 'mean': mean, 'std': std, } standard_ds = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, gene_list=feature_list, backend=self.backend, index_col=0, standardize=True, min_max=False, processing_parameters=processing_parameters, impute=None, ) # collect transformed values ds_2d = np.stack([item.numpy() for item in standard_ds]) # contains nan ds_means = np.nanmean(ds_2d, axis=0) ds_stds = np.nanstd(ds_2d, axis=0) # debug std # standard_ds.std # std_array # == df.std(ddof=0).values # df.std().values # == df.std(ddof=1).values for index, feature in enumerate(standard_ds.gene_list): if feature == 'all_missing': # no features at all so transformed stat is nan self.assertTrue(np.isnan(ds_means[index])) self.assertTrue(np.isnan(ds_stds[index])) # original stats are also nan np.testing.assert_almost_equal( standard_ds.mean[index], df[feature].mean() ) np.testing.assert_almost_equal( standard_ds.std[index], df[feature].std(ddof=0) ) continue # note some NaN values 'E' still have statistics # TODO our reduce statistics has to cope with some NaN! # until then 'E' fails # check transformed std / mean are 1 and 0 per feature np.testing.assert_almost_equal(ds_means[index], 0) np.testing.assert_almost_equal(ds_stds[index], 1) # external statisic matches internally used statistics np.testing.assert_almost_equal( standard_ds.mean[index], df[feature].mean() ) np.testing.assert_almost_equal( standard_ds.std[index], df[feature].std(ddof=0) )
# order of reduced means matches order in
[docs] def test_processing_parameters_minmax(self) -> None: with TestFileContent(self.content) as a_test_file, TestFileContent( self.other_content ) as another_test_file: minmax_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, standardize=False, min_max=True, ) gene_list = minmax_dataset.gene_list df = pd.concat( [ pd.read_csv(a_test_file.filename, index_col=0), pd.read_csv(another_test_file.filename, index_col=0), ], sort=False, )[gene_list] # with min max scaling we can check for values 0 and 1 maximum_array = df.max().values minimum_array = df.min().values # scalar scaling (single max and min) max_n, max_p = map(int, np.unravel_index(np.argmax(df.values), df.shape)) min_n, min_p = map(int, np.unravel_index(np.argmin(df.values), df.shape)) for maximum, minimum in [ # scalar [np.max(maximum_array), np.min(minimum_array)], # list length 1 [[np.max(maximum_array)], [np.min(minimum_array)]], ]: processing_parameters = { 'max': maximum, 'min': minimum, } minmax_ds = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, standardize=False, min_max=True, processing_parameters=processing_parameters, ) self.assertEqual(minmax_ds[max_n][max_p], 1) self.assertEqual(minmax_ds[min_n][min_p], 0) # array scaling (feature wise max and min) max_indeces = map(int, np.argmax(df.values, axis=0)) min_indeces = map(int, np.argmin(df.values, axis=0)) for maximum, minimum in [ # list [maximum_array.tolist(), minimum_array.tolist()], # ndarray [maximum_array, minimum_array], ]: processing_parameters = { 'max': maximum, 'min': minimum, } minmax_ds = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, standardize=False, min_max=True, processing_parameters=processing_parameters, ) # check max_index / min_index are 1 and 0 per feature for feature_index, sample_index in enumerate(max_indeces): self.assertEqual(minmax_ds[sample_index][feature_index], 1) for feature_index, sample_index in enumerate(min_indeces): self.assertEqual(minmax_ds[sample_index][feature_index], 0)
[docs] def test_data_loader(self) -> None: """Test data_loader.""" gene_subset_list = ['B', 'D', 'F'] with TestFileContent(self.content) as a_test_file: with TestFileContent(self.other_content) as another_test_file: gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, gene_list=gene_subset_list, backend=self.backend, index_col=0, ) data_loader = DataLoader( gene_expression_dataset, batch_size=2, shuffle=True ) for batch_index, batch in enumerate(data_loader): self.assertEqual( batch.shape, ( 1 if batch_index == 3 else 2, gene_expression_dataset.number_of_features, ), ) if batch_index > 2: break
def _test_indexed(self, ds, keys, index): key = keys[index] positive_index = index % len(ds) # get_key (support for negative index?) self.assertEqual(key, ds.get_key(positive_index)) self.assertEqual(key, ds.get_key(index)) # get_index self.assertEqual(positive_index, ds.get_index(key)) # get_item_from_key self.assertTrue(all(ds[index] == ds.get_item_from_key(key))) # keys self.assertSequenceEqual(keys, list(ds.keys())) # duplicate keys self.assertFalse(ds.has_duplicate_keys)
[docs] def test_all_base_for_indexed_methods(self): with TestFileContent(self.content) as a_test_file: with TestFileContent(self.other_content) as another_test_file: gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, another_test_file.filename, backend=self.backend, index_col=0, ) gene_expression_ds_0 = GeneExpressionDataset( a_test_file.filename, backend=self.backend, index_col=0 ) gene_expression_ds_1 = GeneExpressionDataset( another_test_file.filename, backend=self.backend, index_col=0 ) all_keys = [ row.split(',')[0] for row in self.content.split(os.linesep)[1:] + self.other_content.split(os.linesep)[1:] ] for ds, keys in [ (gene_expression_dataset, all_keys), (gene_expression_ds_0, all_keys[: len(gene_expression_ds_0)]), (gene_expression_ds_1, all_keys[len(gene_expression_ds_0) :]), (gene_expression_ds_0 + gene_expression_ds_1, all_keys), ]: index = -1 self._test_indexed(ds, keys, index) # duplicate duplicate_ds = gene_expression_ds_0 + gene_expression_ds_0 self.assertTrue(duplicate_ds.has_duplicate_keys) # GeneExpressionDataset does not test and raise with TestFileContent(self.content) as a_test_file: gene_expression_dataset = GeneExpressionDataset( a_test_file.filename, a_test_file.filename, backend=self.backend, index_col=0, ) self.assertTrue(gene_expression_dataset.has_duplicate_keys)
[docs]class TestGeneExpressionDatasetLazyBackend( TestGeneExpressionDatasetEagerBackend ): # noqa """Testing GeneExpressionDataset with lazy backend."""
[docs] def setUp(self): self.backend = 'lazy' print(f'backend is {self.backend}') self.content = CONTENT self.other_content = MORE_CONTENT
if __name__ == '__main__': unittest.main()