Source code for tldap.query

# Copyright 2012-2014 Brian May
#
# This file is part of python-tldap.
#
# python-tldap is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# python-tldap is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with python-tldap  If not, see <http://www.gnu.org/licenses/>.

""" Used to perform LDAP queries. """

from __future__ import absolute_import

import six

import ldap3

import tldap
import tldap.manager
import tldap.helpers
import tldap.filter

import django.utils.tree
from django.utils.encoding import python_2_unicode_compatible

# Used to control how many objects are worked with at once in some cases (e.g.
# when deleting objects).
ITER_CHUNK_SIZE = 100

# The maximum number of items to display in a QuerySet.__repr__
REPR_OUTPUT_SIZE = 20


@python_2_unicode_compatible
[docs]class QuerySet(object): """ Represents a lazy database lookup for a set of objects. """ def __init__(self, cls, using, settings, base_dn): assert cls is not None self._from_cls = None self._cls = cls self._dn = None self._alias = using self._settings = settings self._query = None self._base_dn = base_dn self._iter = None self._result_cache = None self._limits = None @property def model(self): return self._cls ######################## # PYTHON MAGIC METHODS # ######################## def __str__(self): data = list(self[:REPR_OUTPUT_SIZE + 1]) if len(data) > REPR_OUTPUT_SIZE: data[-1] = "...(remaining elements truncated)..." return six.text_type(data) def __len__(self): # Since __len__ is called quite frequently (for example, as part of # list(qs), we make some effort here to be as efficient as possible # whilst not messing up any existing iterators against the QuerySet. if self._result_cache is None: if self._iter: self._result_cache = list(self._iter) else: self._result_cache = list(self.iterator()) elif self._iter: self._result_cache.extend(self._iter) return len(self._result_cache) def __iter__(self): if self._result_cache is None: self._iter = self.iterator() self._result_cache = [] if self._iter: return self._result_iter() # Python's list iterator is better than our version when we're just # iterating over the cache. return iter(self._result_cache) def _result_iter(self): pos = 0 while 1: upper = len(self._result_cache) while pos < upper: yield self._result_cache[pos] pos = pos + 1 if not self._iter: raise StopIteration if len(self._result_cache) <= pos: self._fill_cache() def __getitem__(self, k): """ Retrieves an item or slice from the set of results. """ if not isinstance(k, (slice,) + six.integer_types): raise TypeError if not isinstance(k, slice) and (k < 0): raise IndexError("Negative indexing is not supported.") if isinstance(k, slice) and (k.start is not None and k.start < 0): raise IndexError("Negative indexing is not supported.") if isinstance(k, slice) and (k.stop is not None and k.stop < 0): raise IndexError("Negative indexing is not supported.") if self._result_cache is not None: if self._iter is not None: # The result cache has only been partially populated, so we may # need to fill it out a bit more. if isinstance(k, slice): if k.stop is not None: # Some people insist on passing in strings here. bound = int(k.stop) else: bound = None else: bound = k + 1 if len(self._result_cache) < bound: self._fill_cache(bound - len(self._result_cache)) return self._result_cache[k] if isinstance(k, slice): qs = self._clone() if k.start is not None: start = int(k.start) else: start = 0 if k.stop is not None: stop = int(k.stop) else: stop = None qs._limits = start, stop return k.step and list(qs)[::k.step] or qs qs = self._clone() qs._limits = k, k + 1 return list(qs)[0] def __and__(self, other): assert isinstance(other, QuerySet) assert self._alias == other._alias assert self._settings == other._settings self._merge_sanity_check(other) if self._query is None: return other._clone() if isinstance(other, EmptyQuerySet): return other._clone() combined = self._clone() combined._query = combined._query & other._query return combined def __or__(self, other): assert isinstance(other, QuerySet) assert self._alias == other._alias assert self._settings == other._settings self._merge_sanity_check(other) combined = self._clone() if self._query is None: return combined if isinstance(other, EmptyQuerySet): return combined combined._query = combined._query | other._query return combined #################################### # METHODS THAT DO DATABASE QUERIES # #################################### def _get_filter_item(self, name, operation, value): """ A field could be found for this term, try to get filter string for it. """ assert isinstance(name, six.string_types) assert isinstance(value, six.string_types + (bytes,)) if operation is None: return tldap.filter.filter_format( "(%s=%s)", [name, value]) elif operation == "contains": assert value != "" return tldap.filter.filter_format( "(%s=*%s*)", [name, value]) else: raise ValueError("Unknown search operation %s" % operation) def _get_filter(self, q): """ Translate the Q tree into a filter string to search for, or None if no results possible. """ # check the details are valid if q.negated and len(q.children) == 1: op = "!" elif q.connector == tldap.Q.AND: op = "&" elif q.connector == tldap.Q.OR: op = "|" else: raise ValueError("Invalid value of op found") # scan through every child search = [] for child in q.children: # if this child is a node, then descend into it if isinstance(child, django.utils.tree.Node): search.append(self._get_filter(child)) else: # otherwise get the values in this node name, value = child # split the name if possible name, _, operation = name.rpartition("__") if name == "": name, operation = operation, None # replace pk with the real attribute if name == "pk": name = self._cls._meta.pk # DN is a special case if name == "dn": name = "entryDN:" if isinstance(value, list): s = [] for v in value: s.append(self._get_filter_item(name, operation, v)) search.append("(&".join(search) + ")") # or process just the single value else: search.append( self._get_filter_item(name, operation, value)) continue # try to find field associated with name try: field = self._cls._meta.get_field_by_name(name) except KeyError: # no field found, try to lookup linked models raise ValueError( "Cannot do a search on %s " "as we cannot find the field" % name) else: # field was found # try to turn list into single value if isinstance(value, list) and len(value) == 1: value = value[0] assert isinstance(value, str) # process as list if isinstance(value, list): s = [] for v in value: v = field.value_to_db(v) s.append(self._get_filter_item(name, operation, v)) search.append("(&".join(search) + ")") # or process just the single value else: value = field.value_to_db(value) search.append( self._get_filter_item(name, operation, value)) # output the results if len(search) == 1 and not q.negated: # just one non-negative term, return it return search[0] else: # multiple terms return "(" + op + "".join(search) + ")" def _clone_query(self, q): dst = tldap.Q() dst.connector = q.connector dst.negated = q.negated """ Expands exandable q items, i.e. for relations between objects. """ # scan through every child for child in q.children: # if this child is a node, then descend into it if isinstance(child, django.utils.tree.Node): dst.children.append(self._clone_query(child)) else: dst.children.append(child) return dst def _expand_query(self, q): dst = tldap.Q() dst.connector = q.connector dst.negated = q.negated """ Expands exandable q items, i.e. for relations between objects. """ # scan through every child for child in q.children: # if this child is a node, then descend into it if isinstance(child, django.utils.tree.Node): dst.children.append(self._expand_query(child)) continue # otherwise get the values in this node name, value = child # split the name if possible name, _, operation = name.rpartition("__") if name == "": name, operation = operation, None # replace pk with the real attribute if name == "pk": name = self._cls._meta.pk # dn searches are a special case if name == "dn": dst.children.append(child) continue # try to find field associated with name try: self._cls._meta.get_field_by_name(name) dst.children.append(child) continue except KeyError: # no field found, try to lookup linked models pass # get raw value from class cls_value = self._cls.__dict__.get(name, None) # fail for cases we don't understand if cls_value is None: raise ValueError( "Cannot do a search on %s " "as we do not know about it" % name) # fail for cases we don't understand if not isinstance(cls_value, tldap.manager.LinkDescriptor): raise ValueError( "Cannot do a search on %s " "as we do not know the type" % name) # ask the LinkDescriptor for a q tree child = cls_value.get_q_for_linked_instance(value, operation) # if child is None, then no results can be found # we need to handle this later. dst.children.append(child) # go through results new_children = [] for term in dst.children: # if result is not None, keep it if term is not None: new_children.append(term) # a result of None means 0 results elif q.negated: # not 0 results is all results return tldap.Q(objectClass='*') elif q.connector == tldap.Q.AND: # 0 results and anything is still 0 results return None elif q.connector == tldap.Q.OR: # 0 results or anything is just anything pass dst.children = new_children # output the results if len(dst.children) == 0: # no search terms, all terms were None return None else: # multiple terms return dst def _get_search_params(self): # set the database we should use as required alias = self._alias or tldap.DEFAULT_LDAP_ALIAS connection = tldap.connections[alias] # get object classes to search if self._from_cls is None: object_classes = ( self._cls._meta.search_classes or self._cls._meta.object_classes) else: object_classes = self._from_cls._meta.search_classes if self._query is not None: # expand query requested_query = self._expand_query(self._query) # add object classes to search array query = tldap.Q() for oc in object_classes: query = query & tldap.Q(objectClass=oc) # do a SUBTREE search scope = ldap3.SEARCH_SCOPE_WHOLE_SUBTREE # add requested query if self._query is not None: if requested_query is not None: query = query & requested_query else: query = None # create a "list" of base_dn to search base_dn = self.get_base_dn() assert base_dn is not None # get list of field names we support field_names = self._cls._meta.get_all_field_names() # construct search filter string if query is not None: search_filter = self._get_filter(query) else: search_filter = None return alias, connection, base_dn, scope, search_filter, field_names
[docs] def iterator(self): """ An iterator over the results from applying this QuerySet to the database. """ # get search parameters alias, connection, base_dn, scope, search_filter, field_names = ( self._get_search_params()) if search_filter is None: return if self._limits is not None: start, stop = self._limits limit = stop else: start = 0 limit = None # repeat for every dn fields = self._cls._meta.fields # get the results for i in connection.search(base_dn, scope, search_filter, field_names, limit=limit): if start > 0: start = start - 1 continue # create new object o = self._cls( dn=i[0], using=alias, settings=self._settings, ) # set the other fields for field in fields: name = field.name value = i[1].get(name, []) value = field.to_python(value) setattr(o, name, value) # save raw db values for latter use o._db_values = ( tldap.helpers.CaseInsensitiveDict(i[1])) # give caller this result yield o
[docs] def get(self, *args, **kwargs): """ Performs the query and returns a single object matching the given keyword arguments. """ qs = self.filter(*args, **kwargs) num = len(qs) if num == 1: return qs._result_cache[0] if not num: raise self._cls.DoesNotExist( "%s matching query does not exist." % self._cls._meta.object_name) raise self._cls.MultipleObjectsReturned( "get() returned more than one %s " "-- it returned %s! Lookup parameters were %s" % (self._cls._meta.object_name, num, kwargs))
[docs] def create(self, **kwargs): """ Creates a new object with the given kwargs, saving it to the database and returning the created object. """ obj = self._cls(settings=self._settings, using=self._alias, **kwargs) obj.save(force_add=True) return obj
[docs] def get_or_create(self, **kwargs): """ Looks up an object with the given kwargs, creating one if necessary. Returns a tuple of (object, created), where created is a boolean specifying whether an object was created. """ assert kwargs, \ 'get_or_create() must be passed at least one keyword argument' defaults = kwargs.pop('defaults', {}) try: return self.get(**kwargs), False except self._cls.DoesNotExist: params = dict(kwargs) params.update(defaults) obj = self._cls( settings=self._settings, using=self._alias, **params) obj.save(force_add=True) return obj, True
[docs] def none(self): """ Returns an empty QuerySet. """ return self._clone(klass=EmptyQuerySet) ################################################################## # PUBLIC METHODS THAT ALTER ATTRIBUTES AND RETURN A NEW QUERYSET # ##################################################################
[docs] def filter(self, *args, **kwargs): """ Returns a new QuerySet instance with the args ANDed to the existing set. """ return self._filter_or_exclude(False, *args, **kwargs)
[docs] def exclude(self, *args, **kwargs): """ Returns a new QuerySet instance with NOT (args) ANDed to the existing set. """ return self._filter_or_exclude(True, *args, **kwargs)
def _filter_or_exclude(self, negate, *args, **kwargs): clone = self._clone() if negate: q = ~tldap.Q(*args, **kwargs) else: q = tldap.Q(*args, **kwargs) if clone._query is None: clone._query = q else: clone._query = clone._query & q return clone
[docs] def using(self, using, settings=None): """ Selects which database this QuerySet should excecute it's query against. """ clone = self._clone() clone._alias = using clone._settings = settings return clone
def base_dn(self, base_dn): qs = self._clone() qs._base_dn = base_dn return qs def get_base_dn(self): base_dn = self._base_dn if base_dn is None: base_dn = self._cls.get_default_base_dn( self._alias, self._settings) return base_dn def convert(self, cls): qs = self._clone() qs._from_cls = cls return qs ################################### # PUBLIC INTROSPECTION ATTRIBUTES # ################################### ################### # PRIVATE METHODS # ################### def _clone(self, klass=None): if klass is None: klass = self.__class__ qs = klass(self._cls, self._alias, self._settings, self._base_dn) if self._query is not None: qs._query = self._clone_query(self._query) else: qs._query = None qs._base_dn = self._base_dn qs._from_cls = self._from_cls return qs def _fill_cache(self, num=None): """ Fills the result cache with 'num' more entries (or until the results iterator is exhausted). """ if self._iter: try: for i in range(num or ITER_CHUNK_SIZE): self._result_cache.append(next(self._iter)) except StopIteration: self._iter = None def _merge_sanity_check(self, other): """ Checks that we are merging two comparable QuerySet classes. By default this does nothing, but see the ValuesQuerySet for an example of where it's useful. """ pass
[docs]class EmptyQuerySet(QuerySet): """ Represents an empty query set with no results. """ def __init__(self, cls, alias, settings, base_dn): super(EmptyQuerySet, self).__init__(cls, alias, settings, base_dn) self._result_cache = [] def __and__(self, other): assert isinstance(other, QuerySet) return self._clone() def __or__(self, other): assert isinstance(other, QuerySet) return other._clone() def delete(self): pass def _clone(self, klass=None, **kwargs): c = super(EmptyQuerySet, self)._clone(klass, **kwargs) c._result_cache = [] return c def iterator(self): # This slightly odd construction is because we need an empty generator # (it raises StopIteration immediately). yield iter([]).next()
[docs] def all(self): """ Always returns EmptyQuerySet. """ return self
[docs] def filter(self, *args, **kwargs): """ Always returns EmptyQuerySet. """ return self
[docs] def exclude(self, *args, **kwargs): """ Always returns EmptyQuerySet. """ return self