Source code for disbi.views

"""
DISBi views that need to subclassed and configured with 
the appropriate experiment models by a concrete app. 
"""
# standard library
import re
from io import StringIO

# third-party
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import numpy as np

# Django
from django.forms import formset_factory
from django.http.response import HttpResponse, JsonResponse
from django.shortcuts import redirect, render
from django.views.decorators.http import require_safe
from django.views.generic import View

# DISBi
from disbi.cache_table import check_for_table_change
from disbi.exceptions import NoRelatedMeasurementModel, NotSupportedError
from disbi.forms import construct_forms, foldchange_form_factory
from disbi.option_utils import get_display_name
from disbi.experiment_filter import get_requested_experiments
from disbi.result import DataResult
from disbi.templatetags.custom_template_tags import nested_dict_as_table
from disbi.utils import get_id_str, get_ids


# ---------------------------- main views -----------------------------
[docs]class DisbiExperimentFilterView(View): """ View for showing dynamic formsets, allowing to choose all combinations. """ experiment_model = None
[docs] def get(self, request): """ Returns: TemplateResponse: The rendered forms without initial data. """ formclasses = construct_forms(self.experiment_model) formset_list = [] for formclass in formclasses: FormSetClass = (formset_factory(formclass.classname, max_num=formclass.classname.max_num)) formset_list.append(FormSetClass(prefix=formclass.prefix)) context = {'formset_list': formset_list, 'subheader': 'Filter'} return render(request, 'disbi/filter.html', context)
[docs] def post(self, request): """ HttpResponseRedirect: For valid POST requests the client will be redirected to the appropriate data view. """ formclasses = construct_forms(self.experiment_model) app_label = self.experiment_model._meta.app_label formset_list = [] for formclass in formclasses: FormSetClass = formset_factory(formclass.classname, max_num=formclass.classname.max_num) formset_list.append(FormSetClass(request.POST, prefix=formclass.prefix)) # Validation for formset in formset_list: if not formset.is_valid(): validated = False break else: validated = True continue if validated: requested_exps = get_requested_experiments(formset_list, self.experiment_model) id_str = get_id_str(requested_exps) url = '/{}/data/{}/'.format(app_label, id_str) return redirect(url) # If not validated... else: formset_list = [] # Instantiate formsets with prefix and POST data. for formclass in formclasses: FormSetClass = formset_factory(formclass.classname, max_num=formclass.classname.max_num) # Check whether POST contains data for this specific formset. has_data = False for pkey in request.POST.keys(): if pkey.startswith(formclass.prefix + '-0-'): if request.POST.get(pkey): has_data = True break else: continue # If there is data instantiate with POST data, # else instantiate empty formset ## This is done, because forms completely removed with JS ## do not show up again otherwise if has_data: formset_list.append(FormSetClass(request.POST, prefix=formclass.prefix)) else: formset_list.append(FormSetClass(prefix=formclass.prefix)) context = {'formset_list': formset_list, 'subheader': 'Filter'} return render(request, 'disbi/filter.html', context)
[docs]class DisbiDataView(View): """ View for creating the the basic data view without the data table. """ experiment_meta_model = None
[docs] def get(self, request, exp_id_str): """ View for creating and displaying the result table. Args: request: The WSGI request. exp_id_str: The ids of all requested experiments from the table view joined on "_". Returns: TemplateResponse: The template for the data view with the appropriate form and the result table with information about the experiments. """ # Check if the backbone table needs to be recreated. check_for_table_change(self.experiment_meta_model, check_for='bio') # Check if the data tables needs to be dropped. check_for_table_change(self.experiment_meta_model, check_for='data') try: # Get the displayed experiments from the URL. exp_ids = get_ids(exp_id_str) requested_exps = self.experiment_meta_model.objects.filter(pk__in=exp_ids) num_exps = len(requested_exps) if num_exps > 1: # Instantiate empty fold change form. FoldChangeForm = foldchange_form_factory(requested_exps) FoldChangeFormset = formset_factory(FoldChangeForm) foldchange_formset = FoldChangeFormset(prefix='fc') # Instantiate empty plot compare form. PlotCompareForm = foldchange_form_factory(requested_exps) #PlotCompareFormset = formset_factory(PlotCompareForm) plotcompare_form = PlotCompareForm() else: foldchange_formset = None plotcompare_form = None # Create the data table. result = DataResult(requested_exps, self.experiment_meta_model) table_data = result.get_or_create_base_table() # Create the talbe with information about the selected experiments. view_exps = [exp.result_view() for exp in requested_exps] context = {'table_data': table_data, 'view_exps': view_exps, 'num_exps': num_exps, 'foldchange_formset': foldchange_formset, 'plotcompare_form': plotcompare_form, 'subheader': 'Data'} return render(request, 'disbi/data.html', context) except NoRelatedMeasurementModel as exc: return HttpResponse('{} has no experimental data yet.'.format(exc.exp))
# ---------------------------- AJAX views -----------------------------
[docs]class DisbiExpInfoView(View): """ View for getting information about the experiments in the preview table. """ experiment_model = None
[docs] def post(self, request): """ Get information about matched experiments. Args: request: The WSGI request. Returns: JSONResponse: JSON object with number of matched experiments and a HTML table with information about those experiments. """ formclasses = construct_forms(self.experiment_model) formset_list = [] # Instantiate formsets with prefix and POST data. for formclass in formclasses: FormSetClass = formset_factory(formclass.classname, max_num=formclass.classname.max_num) formset_list.append(FormSetClass(request.POST, prefix=formclass.prefix)) # Validation for formset in formset_list: if not formset.is_valid(): validated = False break else: validated = True continue if validated: requested_exps = get_requested_experiments(formset_list, self.experiment_model) num_exps = len(requested_exps) if num_exps > 0: view_exps = [exp.view() for exp in requested_exps] table_exps = nested_dict_as_table(view_exps, make_foot=False) else: view_exps = None table_exps = None return JsonResponse( {'numExps': num_exps, 'expParams': view_exps, 'tableExps': table_exps} )
[docs]class DisbiGetTableData(View): """ View for initially getting the data for the datatable. """ experiment_meta_model = None
[docs] def get(self, request, exp_id_str): """ Return new data for the datatable with new columns for calculated fold changes. Args: request: The WSGI request. exp_id_str: The ids of all requested experiments from the table view joined on "_". Returns: JSONResponse: The data for the datatable. """ response = {} response['status'] = None response['data'] = {} response['err_msg'] = None exp_ids = get_ids(exp_id_str) requested_exps = self.experiment_meta_model.objects.filter(pk__in=exp_ids) result = DataResult(requested_exps, self.experiment_meta_model) table_data = result.get_or_create_base_table(fetch_as='namedtuple') response['data']['columns'] = table_data[0]._fields response['data']['tableData'] = [tuple(row) for row in table_data] return JsonResponse(response)
[docs]class DisbiCalculateFoldChangeView(View): """ View for calculating the fold change between two experiments. """ experiment_model = None experiment_meta_model = None
[docs] def post(self, request, exp_id_str): """ Return new data for the datatable with new columns for calculated fold changes. Args: request: The WSGI request. exp_id_str: The ids of all requested experiments from the table view joined on "_". Returns: JSONResponse: The new data for the datatable or the error message. """ response = {} response['status'] = None response['data'] = {} response['err_msg'] = None exp_ids = get_ids(exp_id_str) # Instantiate formset with POST data requested_exps = self.experiment_model.objects.filter(pk__in=exp_ids) FoldChangeForm = foldchange_form_factory(requested_exps) FoldChangeFormset = formset_factory(FoldChangeForm) foldchange_formset = FoldChangeFormset(request.POST, prefix='fc') # Validation try: if foldchange_formset.is_valid(): # Filter out empty form data. These can result if both fields # are left empty. exps_for_foldchange = [cleaned_data for cleaned_data in foldchange_formset.cleaned_data if cleaned_data] # TODO: This should be done in the form. # Check data. ## Check for empty form. if not exps_for_foldchange: raise ValueError('You need to select at least two unequal experiments ' 'to calculate a fold change.') ## Check whether each two experiments have the same measurementmodel. for pair in exps_for_foldchange: dividend = self.experiment_meta_model.objects.get(pk=pair['dividend'].pk) divisor = self.experiment_meta_model.objects.get(pk=pair['divisor'].pk) if dividend.measurementmodel != divisor.measurementmodel: raise ValueError('To compare experiments, they need to have the same datatype.') result = DataResult(requested_exps, self.experiment_meta_model) table_data = result.add_foldchange(exps_for_foldchange, fetch_as='namedtuple') response['data']['columns'] = table_data[0]._fields response['data']['tableData'] = [tuple(row) for row in table_data] response['status'] = True return JsonResponse(response) else: response['err_msg'] = ('You need to select at least two unequal experiments ' 'to calculate a fold change.') response['status'] = False return JsonResponse(response) except ValueError as exc: response['err_msg'] = str(exc) response['status'] = False return JsonResponse(response)
[docs]class DisbiComparePlotView(View): """ View for generating a scatter plot that compares two experiments. """ experiment_model = None experiment_meta_model = None
[docs] def post(self, request, exp_id_str): """ Get the scatter plot comparing two experiments. If the data model of the two experiments matches, a plot is generated, else an error message is raised. Args: request: The WSGI request. exp_id_str: The ids of all requested experiments from the table view joined on "_". Returns: JSONResponse: The plot image SVG or the error message. """ response = {} response['status'] = None response['data'] = None response['err_msg'] = None exp_ids = get_ids(exp_id_str) # Instantiate formset with POST data requested_exps = self.experiment_model.objects.filter(pk__in=exp_ids) PlotCompareForm = foldchange_form_factory(requested_exps) plotcompare_form = PlotCompareForm(request.POST) # Validation try: if plotcompare_form.is_valid(): exps_for_compare = plotcompare_form.cleaned_data # Get the metainfo proxy to check the measurementmodel. dividend = self.experiment_meta_model.objects.get(pk=exps_for_compare['dividend'].pk) divisor = self.experiment_meta_model.objects.get(pk=exps_for_compare['divisor'].pk) if dividend.measurementmodel != divisor.measurementmodel: raise ValueError('To compare experiments, they need to have the same datatype.') if exps_for_compare: # Get the data from the DB result = DataResult(requested_exps, self.experiment_meta_model) data = result.get_exp_columns(exps_for_compare) # Remove None values, because might be missing in the SVG otherwise. data = [pair for pair in data if pair[0] and pair[1]] # Load the data into arrays. data = np.array(data).transpose() x = data[0] y = data[1] fig = plt.figure() ax = fig.add_subplot(111) # Add the scatter plot. ax.plot(x, y, '.', color='#005374') # color=BRICS blue 1 # Add a grey line through the origin. a = np.linspace(*ax.get_xbound()) b = np.linspace(*ax.get_ybound()) ax.plot(a, b, '--', color='#5f5f5f') # color=BRICS grey 2 # Set labels. ax.set_xlabel(str(dividend)) ax.set_ylabel(str(divisor)) # Write SVG tho string buffer and return it as response. buf = StringIO() fig.savefig(buf, format='svg') buf.seek(0) # rewind the data plt.close('all') response['data'] = buf.read() response['status'] = True return JsonResponse(response) else: response['status'] = False response['err_msg'] = ('You need to select at least two unequal experiments ' 'to compare experiments.') return JsonResponse(response) except ValueError as exc: response['status'] = False response['err_msg'] = str(exc) return JsonResponse(response)
[docs]class DisbiDistributionPlotView(View): """ View for generating a histogram of the distribution of a column in the data table. """ experiment_model = None experiment_meta_model = None
[docs] def post(self, request, exp_id_str): """ Get the distribution of a column as a histogram. If a non fold change column is plotted the matching data is fetched from the DB with the ORM. If a fold change column is selected, a new DataResult object is instantiated and only the fold change column is retrieved from the result table cached in the DB. Args: request: The WSGI request. exp_id_str: The ids of all requested experiments from the table view joined on "_". Returns: JSONResponse: The plot image SVG or the error message. """ response = {} response['status'] = None response['data'] = None response['err_msg'] = None try: fold_change = False # The column to be plotted. col = self.request.POST['column'] # Pattern that matches the trailing id of th experiment (e.g. _16) # or of a fold change column (e.g. _16_17). id_pattern = re.compile(r'_(\d+(?:_\d+)*)') # Get the id. exp_id = id_pattern.search(col).group(1) # Substitute the experiment id with '' to get the name. column_display_name = id_pattern.sub('', col) if 'fc' not in column_display_name: # We're not dealing with a fold change. exp = self.experiment_meta_model.objects.get(pk=exp_id) # Map the names used to display the columns to their actual names. display_names_to_names = dict( (get_display_name(field), field.name, ) for field in exp.measurementmodel._meta.get_fields() ) column_name = display_names_to_names[column_display_name] # Get only values that are not NULL. filter_conds = {'experiment': exp, column_name+'__isnull': False} data = exp.measurementmodel.objects.filter(**filter_conds).values_list(column_name) xlabel = '{} {}'.format(exp.measurementmodel._meta.verbose_name, column_display_name) title = 'Distribution of {} {} {}'.format(exp.measurementmodel._meta.verbose_name, column_display_name, exp_id) else: # We're dealing with a fold change column. dividend_id, divisor_id = exp_id.split('_') dividend_exp = self.experiment_meta_model.objects.get(pk=dividend_id) divisor_exp = self.experiment_meta_model.objects.get(pk=divisor_id) if dividend_exp.measurementmodel != divisor_exp.measurementmodel: raise NotSupportedError('To plot a fold change the experiments ' 'must have the same datatype.') # Get the displayed experiments from the URL. exp_ids = get_ids(exp_id_str) requested_exps = self.experiment_model.objects.filter(pk__in=exp_ids) # Create the data table. result = DataResult(requested_exps, self.experiment_meta_model) data = result.get_foldchange(({'dividend': dividend_exp, 'divisor': divisor_exp},)) # Filter out NULL values. ## Flatten. data = sum(data, ()) data = list((datapoint for datapoint in data if datapoint is not None)) xlabel = 'log2 fold change {}/{}'.format(dividend_exp.id, divisor_exp.id) fold_change = True title = 'Distribution of fold change {}/{}'.format( dividend_exp.id, divisor_exp.id ) ylabel = 'Number of data points' # The y-label is always same for histograms. fig = plt.figure() ax = fig.add_subplot(111) data = np.array(data) if fold_change: # Calculate log2 for fold change and filter out inf and NaN values # as result from log(0) and log(-a). data = np.log2(data) data = data[~np.isinf(data) & ~np.isnan(data)] # Use `doane` estimator as using `auto` for non normal distributed data # takes forever. ax.hist(data, bins='doane', color='#005374') # Set labels and title. ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_title(title) # Store the plot in a byte stream and encode it as base64. buf = StringIO() fig.savefig(buf, format='svg') buf.seek(0) # rewind the data plt.close('all') response['status'] = True response['data'] = buf.read() return JsonResponse(response) except NotSupportedError as exc: response['status'] = False response['err_msg'] = str(exc) return JsonResponse(response) except: response['status'] = False response['err_msg'] = 'Internal Error: Column cannot be plotted.' return JsonResponse(response)