Helper functions for performing operations circumventing the ORM layer.
# standard library
from collections import OrderedDict, namedtuple

# Django
from django.db import connection

[docs]def dictfetchall(cursor): """Return all rows from a cursor as a dict.""" columns = [col[0] for col in cursor.description] return [ dict(zip(columns, row)) for row in cursor.fetchall() ]
[docs]def namedtuplefetchall(cursor): """Return all rows from a cursor as a namedtuple.""" desc = cursor.description nt_result = namedtuple('Result', [col[0] for col in desc]) return [nt_result(*row) for row in cursor.fetchall()]
[docs]def ordereddictfetchall(cursor): """Return all rows from a cursor as an OrdredeDict.""" columns = [col[0] for col in cursor.description] return [ OrderedDict(zip(columns, row)) for row in cursor.fetchall() ]
[docs]def from_db(sql, parameters=None, fetch_as='ordereddict'): """ Fetch values from the DB, given a SQL query. Args: sql (str): The SQL statement. Keyword Args: parameters: Parameters for a parametrized query. If given `sql` must contain the appropriate palceholders. Defaults to None. fetch_as (str): The data type as which the values should be fetched. Choices are 'ordereddict', 'dict', 'namedtuple' and 'tuple'. Raises: ValueError: If a unrecognized value for ``fetch_as`` is given. """ with connection.cursor() as cursor: if parameters is not None: cursor.execute(sql, parameters) else: cursor.execute(sql) if fetch_as == 'ordereddict': rows = ordereddictfetchall(cursor) elif fetch_as == 'dict': rows = dictfetchall(cursor) elif fetch_as == 'namedtuple': rows = namedtuplefetchall(cursor) elif fetch_as == 'tuple': rows = cursor.fetchall() else: raise ValueError('Values cannot be fetched as {fetch_type}. ' .format(fetch_type=fetch_as) + 'Choices are \'ordereddict\', \'dict\', \'namedtuple\' and \'tuple\'.') return rows
[docs]def exec_query(sql, parameters=None): """ Execute a plain SQL query. Use parameterized query if parameters are given. Args: sql (str): The SQL query. Keyword Args: parameters (iterable): An iterable of parameters, that will be autoescaped. """ with connection.cursor() as cursor: if parameters is not None: cursor.execute(sql, parameters) else: cursor.execute(sql)
[docs]def db_table_exists(table_name): """ Check whether a table with a specific name exists in the DB. Args: table_name (str): The name of the table to check for. Returns: bool: True if the table exists, else False. """ return table_name in connection.introspection.table_names()
[docs]def get_columnnames(table_name): """ Get the column names of a DB table. Args: table_name (str): The name of the table. Returns: tuple: The column names. """ query = """ SELECT * FROM %s LIMIT 1""" % table_name row = from_db(query, fetch_as='namedtuple') return row[0]._fields
[docs]def get_m2m_field(intermediary_model, related_model): """ Get the field of an intermediary model, that constitutes the relation to `related_model`. """ fields = [field for field in intermediary_model._meta.get_fields() if field.concrete] for field in fields: if getattr(field, 'target_field', False): if field.target_field.model == related_model: return field
[docs]def get_pk_query_name(model): """Format the primary key column of a model with its DB table.""" return '%s.%s' % (model._meta.db_table,
[docs]def get_fk_query_name(model, related_model): """ Format the DB column name of a foreign key field of a model with the DB table of the model. Finds the foreign key relating to related model automatically, but assumes that there is only one related field. Args: model (Model): The model for which the foreign key field is searched. related_model (Model): A model related to `model`. Returns: str: The formated foreign key column name. """ related_field = [f for f in model._meta.get_fields() if f.is_relation and f.concrete and f.related_model == related_model] return '%s.%s' % (model._meta.db_table, related_field[0].column)
[docs]def get_field_query_name(model, field): """Format the DB column name of a field with the DB table of its model.""" return '%s.%s' % (model._meta.db_table, field.column)