# ====================================================================
# Copyright (c) 2004-2005 Open Source Applications Foundation.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions: 
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software. 
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
# ====================================================================
#

from unittest import TestCase, main
from PyLucene import *


class PhraseQueryTestCase(TestCase):
    """
    Unit tests ported from Java Lucene
    """

    def setUp(self):

        self.directory = RAMDirectory()
        writer = IndexWriter(self.directory, WhitespaceAnalyzer(), True)
    
        doc = Document()
        doc.add(Field("field", "one two three four five",
                      Field.Store.YES, Field.Index.TOKENIZED))
        writer.addDocument(doc)
    
        writer.optimize()
        writer.close()

        self.searcher = IndexSearcher(self.directory)
        self.query = PhraseQuery()

    def tearDown(self):

        self.searcher.close()
        self.directory.close()

    def testNotCloseEnough(self):

        self.query.setSlop(2)
        self.query.add(Term("field", "one"))
        self.query.add(Term("field", "five"))
        hits = self.searcher.search(self.query)
        self.assertEqual(0, len(hits))

    def testBarelyCloseEnough(self):

        self.query.setSlop(3)
        self.query.add(Term("field", "one"))
        self.query.add(Term("field", "five"))
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits))

    def testExact(self):
        """
        Ensures slop of 0 works for exact matches, but not reversed
        """

        # slop is zero by default
        self.query.add(Term("field", "four"))
        self.query.add(Term("field", "five"))
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits), "exact match")

        self.query = PhraseQuery()
        self.query.add(Term("field", "two"))
        self.query.add(Term("field", "one"))
        hits = self.searcher.search(self.query)
        self.assertEqual(0, len(hits), "reverse not exact")

    def testSlop1(self):

        # Ensures slop of 1 works with terms in order.
        self.query.setSlop(1)
        self.query.add(Term("field", "one"))
        self.query.add(Term("field", "two"))
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits), "in order")

        # Ensures slop of 1 does not work for phrases out of order
        # must be at least 2.
        self.query = PhraseQuery()
        self.query.setSlop(1)
        self.query.add(Term("field", "two"))
        self.query.add(Term("field", "one"))
        hits = self.searcher.search(self.query)
        self.assertEqual(0, len(hits), "reversed, slop not 2 or more")

    def testOrderDoesntMatter(self):
        """
        As long as slop is at least 2, terms can be reversed
        """

        self.query.setSlop(2) # must be at least two for reverse order match
        self.query.add(Term("field", "two"))
        self.query.add(Term("field", "one"))
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits), "just sloppy enough")

        self.query = PhraseQuery()
        self.query.setSlop(2)
        self.query.add(Term("field", "three"))
        self.query.add(Term("field", "one"))
        hits = self.searcher.search(self.query)
        self.assertEqual(0, len(hits), "not sloppy enough")

    def testMulipleTerms(self):
        """
        slop is the total number of positional moves allowed
        to line up a phrase
        """
        
        self.query.setSlop(2)
        self.query.add(Term("field", "one"))
        self.query.add(Term("field", "three"))
        self.query.add(Term("field", "five"))
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits), "two total moves")

        self.query = PhraseQuery()
        self.query.setSlop(5) # it takes six moves to match this phrase
        self.query.add(Term("field", "five"))
        self.query.add(Term("field", "three"))
        self.query.add(Term("field", "one"))
        hits = self.searcher.search(self.query)
        self.assertEqual(0, len(hits), "slop of 5 not close enough")

        self.query.setSlop(6)
        hits = self.searcher.search(self.query)
        self.assertEqual(1, len(hits), "slop of 6 just right")

    def testPhraseQueryWithStopAnalyzer(self):

        directory = RAMDirectory()
        stopAnalyzer = StopAnalyzer()
        writer = IndexWriter(directory, stopAnalyzer, True)
        doc = Document()
        doc.add(Field("field", "the stop words are here",
                      Field.Store.YES, Field.Index.TOKENIZED))
        writer.addDocument(doc)
        writer.close()

        searcher = IndexSearcher(directory)

        # valid exact phrase query
        query = PhraseQuery()
        query.add(Term("field","stop"))
        query.add(Term("field","words"))
        hits = searcher.search(query)
        self.assertEqual(1, len(hits))

        # currently StopAnalyzer does not leave "holes", so this matches.
        query = PhraseQuery()
        query.add(Term("field", "words"))
        query.add(Term("field", "here"))
        hits = searcher.search(query)
        self.assertEqual(1, len(hits))

        searcher.close()
  
    def testPhraseQueryInConjunctionScorer(self):

        directory = RAMDirectory()
        writer = IndexWriter(directory, WhitespaceAnalyzer(), True)
    
        doc = Document()
        doc.add(Field("source", "marketing info",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        writer.addDocument(doc)
    
        doc = Document()
        doc.add(Field("contents", "foobar",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        doc.add(Field("source", "marketing info",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        writer.addDocument(doc)
    
        writer.optimize()
        writer.close()
    
        searcher = IndexSearcher(directory)
    
        phraseQuery = PhraseQuery()
        phraseQuery.add(Term("source", "marketing"))
        phraseQuery.add(Term("source", "info"))
        hits = searcher.search(phraseQuery)
        self.assertEqual(2, len(hits))
    
        termQuery = TermQuery(Term("contents","foobar"))
        booleanQuery = BooleanQuery()
        booleanQuery.add(termQuery, BooleanClause.Occur.MUST)
        booleanQuery.add(phraseQuery, BooleanClause.Occur.MUST)
        hits = searcher.search(booleanQuery)
        self.assertEqual(1, len(hits))
    
        searcher.close()
    
        writer = IndexWriter(directory, WhitespaceAnalyzer(), True)
        doc = Document()
        doc.add(Field("contents", "map entry woo",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        writer.addDocument(doc)

        doc = Document()
        doc.add(Field("contents", "woo map entry",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        writer.addDocument(doc)

        doc = Document()
        doc.add(Field("contents", "map foobarword entry woo",
                      Field.Store.YES, Field.Index.TOKENIZED,
                      Field.TermVector.YES))
        writer.addDocument(doc)

        writer.optimize()
        writer.close()
    
        searcher = IndexSearcher(directory)
    
        termQuery = TermQuery(Term("contents", "woo"))
        phraseQuery = PhraseQuery()
        phraseQuery.add(Term("contents", "map"))
        phraseQuery.add(Term("contents", "entry"))
    
        hits = searcher.search(termQuery)
        self.assertEqual(3, len(hits))
        hits = searcher.search(phraseQuery)
        self.assertEqual(2, len(hits))
    
        booleanQuery = BooleanQuery()
        booleanQuery.add(termQuery, BooleanClause.Occur.MUST)
        booleanQuery.add(phraseQuery, BooleanClause.Occur.MUST)
        hits = searcher.search(booleanQuery)
        self.assertEqual(2, len(hits))
    
        booleanQuery = BooleanQuery()
        booleanQuery.add(phraseQuery, BooleanClause.Occur.MUST)
        booleanQuery.add(termQuery, BooleanClause.Occur.MUST)
        hits = searcher.search(booleanQuery)
        self.assertEqual(2, len(hits))
    
        searcher.close()
        directory.close()


if __name__ == "__main__":
    import sys
    if '-loop' in sys.argv:
        sys.argv.remove('-loop')
        while True:
            try:
                main()
            except:
                pass
    else:
         main()
