"""
Some unit tests not yet fitting anywhere else.
"""

#c Copyright 2008-2019, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


from cStringIO import StringIO
import cgi
import contextlib
import datetime
import httplib
import new
import os
import re
import shutil
import sys
import tempfile
import unittest

import numpy

from gavo.helpers import testhelpers

from gavo import api
from gavo import base
from gavo import rscdesc
from gavo import votable
from gavo import stc
from gavo import utils
from gavo.user import upgrade
from gavo.helpers import filestuff
from gavo.helpers import processing
from gavo.protocols import creds
from gavo.utils import DEG
from gavo.utils import pyfits
from gavo.utils import pgsphere
from gavo.votable import tapquery

import tresc


class RenamerDryTest(unittest.TestCase):
	"""tests for some aspects of the file renamer without touching the file system.
	"""
	def testSerialization(self):
		"""tests for correct serialization of clobbering renames.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', '2': '3', '1': '2'}
		self.assertEqual(f.makeRenameProc(fileMap),
			[('b', 'c'), ('a', 'b'), ('2', '3'), ('1', '2')])
	
	def testCycleDetection(self):
		"""tests for cycle detection in renaming recipies.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', 'c': 'a'}
		self.assertRaises(filestuff.Error, f.makeRenameProc, fileMap)


class RenamerWetTest(unittest.TestCase):
	"""tests for behaviour of the file renamer on the file system.
	"""
	def setUp(self):
		def touch(name):
			f = open(name, "w")
			f.close()
		self.testDir = tempfile.mkdtemp("testrun")
		for fName in ["a.fits", "a.txt", "b.txt", "b.jpeg", "foo"]:
			touch(os.path.join(self.testDir, fName))

	def tearDown(self):
		shutil.rmtree(self.testDir, onerror=lambda exc: None)

	def testOperation(self):
		"""tests an almost-realistic application
		"""
		f = filestuff.FileRenamer.loadFromFile(
			StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		found = set(os.listdir(self.testDir))
		expected = set(["b.fits", "b.txt", "c.txt", "c.jpeg", "bar"])
		self.assertEqual(found, expected)
	
	def testNoClobber(self):
		"""tests for effects of repeated application.
		"""
		f = filestuff.FileRenamer.loadFromFile(
			StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		self.assertRaises(filestuff.Error, f.renameInPath, self.testDir)


class TimeCalcTest(testhelpers.VerboseTest):
	"""tests for time transformations.
	"""
	def testJYears(self):
		self.assertEqual(stc.jYearToDateTime(1991.25),
			datetime.datetime(1991, 04, 02, 13, 30, 00))
		self.assertEqual(stc.jYearToDateTime(2005.0),
			datetime.datetime(2004, 12, 31, 18, 0))
	
	def testRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(2010+yr/1000., stc.dateTimeToJYear(
				stc.jYearToDateTime(2010+yr/1000.)), 7,
				"Botched %f"%(2010+yr/1000.))

	def testBYears(self):
		self.assertEqual(stc.bYearToDateTime(1950.0),
			datetime.datetime(1949, 12, 31, 22, 9, 46, 861900))
	
	def testBRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(1950+yr/1000., stc.dateTimeToBYear(
				stc.bYearToDateTime(1950+yr/1000.)), 7,
				"Botched %f"%(1950+yr/1000.))

	def testBesselLieske(self):
		"""check examples from Lieske, A&A 73, 282.
		"""
		for bessel, julian in [
				(1899.999142, 1900),
				(1900., 1900.000858),
				(1950., 1949.999790),
				(1950.000210, 1950.0),
				(2000.0, 1999.998722),
				(2000.001278, 2000.0)]:
			self.assertAlmostEqual(stc.dateTimeToJYear(stc.bYearToDateTime(bessel)),
				julian, places=5)

	def testJDDateTime(self):
		self.assertEqual(utils.formatISODT(stc.jdnToDateTime(2458383.5)),
			"2018-09-22T00:00:00Z")


class ProcessorTest(testhelpers.VerboseTest):
	"""tests for some aspects of helper file processors.

	Unfortunately, sequencing is important here, so we do it all in
	one test.  I guess one should rethink things here, but for now let's
	keep things simple.
	"""
	_rdText = """<resource schema="filetest"><data id="import">
		<sources pattern="*.fits"/><fitsProdGrammar/>
		</data></resource>"""

	def _writeFITS(self, destPath, seed):
		hdu = pyfits.PrimaryHDU(numpy.zeros((2,seed+1), 'i2'))
		hdu.header.set("SEED", seed, "initial number")
		hdu.header.set("WEIRD", "W"*seed)
		hdu.header.set("RECIP", 1./(1+seed))
		hdu.writeto(destPath)

	def setUp(self):
		self.resdir = os.path.join(base.getConfig("tempDir"), "filetest")
		self.origInputs = base.getConfig("inputsDir")
		base.setConfig("inputsDir", base.getConfig("tempDir"))
		if os.path.exists(self.resdir): # Leftover from previous run?
			return
		os.mkdir(self.resdir)
		for i in range(10):
			self._writeFITS(os.path.join(self.resdir, "src%d.fits"%i), i)
		f = open(os.path.join(self.resdir, "filetest.rd"), "w")
		f.write(self._rdText)
		f.close()

	def tearDown(self):
		base.setConfig("inputsDir", self.origInputs)
		shutil.rmtree(self.resdir, True)

	class SimpleProcessor(processing.HeaderProcessor):
		def __init__(self, *args, **kwargs):
			processing.HeaderProcessor.__init__(self, *args, **kwargs)
			self.headersBuilt = 0

		def _isProcessed(self, srcName):
			return "SQUARE" in self.getPrimaryHeader(srcName)

		def _getHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**2)
			self.headersBuilt += 1
			return hdr

	def _getHeader(self, srcName):
		hdus = pyfits.open(os.path.join(self.resdir, srcName))
		hdr = hdus[0].header
		hdus.close()
		return hdr

	def _testPlainRun(self):
		# procmain reads argv, don't confuse it
		sys.argv = ["test", "--bail"]
		# Normal run, no headers present yet
		proc, stdout, errs = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(errs, "")
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 10)
		self.failUnless(os.path.exists(
			os.path.join(self.resdir, "src9.fits.hdr")))
		self.failUnless("SQUARE" in self._getHeader("src9.fits.hdr"))
		# we don't run with applyHeaders here
		self.failIf("SQUARE" in self._getHeader("src9.fits"))

	def _testRespectCaches(self):
		"""tests that no processing is done when cached headers are there.

		This needs to run after _testPlainRun.
		"""
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
	
	def _testNoCompute(self):
		"""tests that no computations take place with --no-compute.
		"""
		sys.argv = ["misctest.py", "--no-compute"]
		os.unlink(os.path.join(self.resdir, "src4.fits.hdr"))
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 0)

	def _testRecompute(self):
		"""tests that missing headers are recomputed.

		This needs to run before _testApplyCaches and after _testNoCompute.
		"""
		sys.argv = ["misctest.py"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 1)

	def _testApplyCaches(self):
		"""tests the application of headers to sources.

		This needs to run after _testPlainRun
		"""
		sys.argv = ["misctest.py", "--apply"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
		self.failUnless("SQUARE" in self._getHeader("src9.fits"))
		# see if data survived
		hdus = pyfits.open(os.path.join(self.resdir, "src9.fits"))
		na = hdus[0].data
		self.assertEqual(na.shape, (2, 10))
	
	def _testForcedRecompute(self):
		"""tests for working --reprocess.
		"""
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 10)

	def _testBugfix(self):
		"""tests for working --reprocess --apply.

		This must run last since we're monkeypatching SimpleProcessor.
		"""
		def newGetHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**3)
			self.headersBuilt += 1
			return hdr
		sys.argv = ["misctest.py", "--reprocess", "--apply"]
		self.SimpleProcessor._getHeader = new.instancemethod(newGetHeader,
			None, self.SimpleProcessor)
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(self._getHeader("src6.fits.hdr")["SQUARE"], 216)

	def testAll(self):
		self._testPlainRun()
		self._testRespectCaches()
		self._testNoCompute()
		self._testRecompute()
		self._testForcedRecompute()
		self._testApplyCaches()
		self._testForcedRecompute()
		self._testBugfix()


class TestGroupsMembership(testhelpers.VerboseTest):
	resources = [('querier', tresc.testUsers)]

	def testGroupsForUser(self):
		"""tests for correctness of getGroupsForUser.
		"""
		self.assertEqual(creds.getGroupsForUser("X_test", "wrongpass"),
			set(), "Wrong password should yield empty set but doesn't")
		self.assertEqual(creds.getGroupsForUser("X_test", "megapass"),
			set(["X_test", "Y_test"]))
		self.assertEqual(creds.getGroupsForUser("Y_test", "megapass"),
			set(["Y_test"]))


@contextlib.contextmanager
def _fakeHTTPLib(respData="", respStatus=200, 
		mime="application/x-votable", exception=None):
	"""runs a test with a fake httplib connection maker.

	This is for TapquerySyncTest and similar.
	"""
	class FakeResult(object):
		status = respStatus

		def getheader(self, key):
			if key.lower()=='content-type':
				return mime
			else:
				self.fail()

		def read(self):
			return respData

	class FakeInfo(object):
		pass

	class FakeConnection(object):
		def __init__(self, *args, **kwargs):
			pass

		def request(self, method, path, data, headers):
			FakeInfo.lastData = data
			if exception is not None:
				raise exception

		def getresponse(self, *args, **kwargs):
			return FakeResult()

		def close(self):
			pass
	
	origConn = httplib.HTTPConnection
	httplib.HTTPConnection = FakeConnection
	try:
		yield FakeInfo
	finally:
		httplib.HTTPConnection = origConn


class TapquerySyncTest(testhelpers.VerboseTest):
# Tests for the tapquery sync object; since TAP queries are expensive,
# we only test things that don't actually run a query.  For more extensive
# exercising, see the taptest RD at the GAVO DC.
	endpoint = "http://dachstest"

	def testNoResult(self):
		job = votable.ADQLSyncJob(self.endpoint, 
			"select * from tap_schema.tables")
		self.assertRaisesWithMsg(tapquery.Error,
			"No result in so far",
			job.openResult,
			())

	def testWrongStatus(self):
		with _fakeHTTPLib(respData="oops", respStatus=404):
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(), "oops")

	def testHTTPError(self):
		import socket
		with _fakeHTTPLib(respData="oops", exception=socket.error("timeout")):
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.NetworkError, job.start)
			self.assertEqual(job.getErrorFromServer(), 
				'Problem connecting to dachstest (timeout)')

	def testTAPError(self):
		with _fakeHTTPLib(respData="""<VOTABLE version="1.2" xmlns:vot="http://www.ivoa.net/xml/VOTable/v1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ivoa.net/xml/VOTable/v1.2 http://vo.ari.uni-heidelberg.de/docs/schemata/VOTable-1.2.xsd"><RESOURCE type="results"><INFO name="QUERY_STATUS" value="ERROR">Could not parse your query: Expected "SELECT" (at char 0), (line:1, col:1)</INFO></RESOURCE></VOTABLE>""", respStatus=400):
			job = votable.ADQLSyncJob(self.endpoint, 
				"selct * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(), 
				'Could not parse your query: Expected "SELECT" (at char 0),'
				' (line:1, col:1)')

	def testConstructionParameters(self):
		with _fakeHTTPLib(respData="ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables", userParams={"MAXREC": 0})
			job.start()
			self.assertEqual(cgi.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])
		
	def testLaterParameters(self):
		with _fakeHTTPLib(respData="ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			job.setParameter("MAXREC", 0)
			job.start()
			self.assertEqual(cgi.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])


# this is http://alasky.u-strasbg.fr/footprints/cats/vizier/I/206?product=MOC&nside=64
_SAMPLE_FITS_MOC = """
eJztl2tMW2Ucxh810WhijNH4QY05iRq2qEOI2eYFI20PUGxPO9rKZbqtQGFFKKNQKAiJTjeNt206
3eYE4m2aZYOp87YpxGxeolHHFplm4gSdGjM1Xpbsg7dfwUQyKQRcjInnSX4f+p7zPu//9ianAbfX
7zENI8eYQEEj0yiOR5uaIjGjvNXw+pxGTbg5bOT63cYs/5y8SDwWbUhEZo/b4nAH/e6SNH7zJ1qc
XFZuiTtgpPG7fPp+Bf6SVBoT+80l3wIz1+OPJg1vorYp6otXRuKGs745Eg9XR4y68DJjVl04adSn
1kfTNkuCpuVKW79pa9TMli1btmzZsmXL1n9Wqc+/gNtn5RgZDrcVzHV4zIxxj//6nqwY/xGZXv/q
93P2DP2y0vhdMUO/7An9sudlTd/P7/SFrODx+7+QP5nfDOIL5rlNjytwHP18Rd7RfmRkFY4tZUy+
Ywq/YKnfHPNjDk3PMX6ZBrMcqU1NdWqoQ5Z7gRFL1JXzL2li+RyBxRZOafqbaVhju+urjGUp58Yp
4vMVucwit5VPfNbo6cfG543Eq6OxaoNb6TKNcKzSsMxA0HSN3qq/y+nDMVAawM/559I/qp/9/82W
LVu2bNmaWlJuBwzAIPwmOR6B/TAE30rO+bAe+mEXsO76QDJdcAdsga+kvCfhRSm/EX6XCrzwPnwv
uQUnAD7ul6VC3ilcAc/ATnhNuv5CuAzmQjWsgSdgk+Q5BRZIXt717pGs86AQFkE3DEv+TGnBfVIR
eQROhbNhHXwIn0vB06UQMYTYFyLn0CvwnnTDlbAcboeH4Cep+CQgxuJ7pRLyKr0HOqUy8ijDu2wO
cF7ZVXBYWpgNxHzjbGnRPGnx+dKSr6Xy76QK8qngWcXbUuVZEIYH4CUgtsoRKXIBmFJVBuBdRR+q
24C6LvVDMfwgRa8B6h7leQ0x1bwj1Z4LO6RYr1S/AX6UGsivgXfi10mNUdgI9KbxVdgLv0hN1C5x
GjADCeJPXA0LgbonHgN6k3gW3gTmI0GszXfBC4BXC/taOK/lGylJLZKclaRASTySz0utF8GlwLNW
4m7NAfrQRg/azgBybcuDh+Fx6ZZrYbfUTq/byam9DlYDzzroXQe97eB3x7vws3RrjbT8JqAuK1qk
lazfH5NWOeCotPp1aQ3xPJgAZnftiXCOtH67tIFzNxLLoyGpk3p3srdrJTBvXcxPNz3rrgBmq5s+
dRPXJuJ7ulXazH3YTF+39Ehb6dHW26QeatBDTXsZ9F7mddsb0nPksJ0a7Vgi7WSe+si5j/vS96vU
T136iau/S9pFb3bfLb3FndpzRBpYCsztAHM3QP33OqV95L8vDvR2EN9Bct9fKX3E748vAeb0AHfr
AOd+Qq5D9HyImf70YlgL1PAg+R3kLn7GO8PlgMcwMz28TRo5GejTSBAiwF0YuROIa+Qp6YtVcEg6
dLN0+EzpyJfSUWK3ZcuWLVu2/p/6A9x9I/E=""".decode("base64").decode("zlib")


class PgSphereDryTest(testhelpers.VerboseTest):
# Tests for pgsphere interface that don't need DB connectivity
# (others are in dbtest)
	def _assertCircleBecomesPolygon(self, alpha, delta, radius):
		alpha, delta, radius = alpha*DEG, delta*DEG, radius*DEG
		c = pgsphere.SCircle(pgsphere.SPoint(alpha, delta), radius)
		centerVec = utils.spherToCart(alpha, delta)
		for pt in c.asPoly().points:
			circleVec = utils.spherToCart(pt.x, pt.y)
			self.assertAlmostEqual(utils.spherDist(circleVec, centerVec), radius)
	
	def testCircle1AsPoly(self):
		self._assertCircleBecomesPolygon(0, 90, 3)

	def testCircle2AsPoly(self):
		self._assertCircleBecomesPolygon(0, 0, 8)

	def testCircle3AsPoly(self):
		self._assertCircleBecomesPolygon(80, -10, 20)

	def testCircle4AsPoly(self):
		self._assertCircleBecomesPolygon(120, -80, 1)

	def testCircle5AsPoly(self):
		self._assertCircleBecomesPolygon(220, -45, 1)

	def testCircle6AsPoly(self):
		self._assertCircleBecomesPolygon(320, 45, 90)

	def testPolyAsPoly(self):
		p = pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((0.5, 0.4), (1, -0.2), (1.5, 0))])
		self.failUnless(p.asPoly() is p)
	
	def testNormSboxAsPoly(self):
		b = pgsphere.SBox(pgsphere.SPoint(0.2, -0.5), pgsphere.SPoint(2, 0.1))
		self.assertEqual(b.asPoly(),
			pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((0.2, -0.5), (0.2, 0.1), (2, 0.1), (2, -0.5))]))

	def testInvSboxAsPoly(self):
		b = pgsphere.SBox(pgsphere.SPoint(2, 0.1), pgsphere.SPoint(-0.1, -0.5))
		self.assertEqual(b.asPoly(),
			pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((-0.1, -0.5), (-0.1, 0.1), (2, 0.1), (2, -0.5))]))

	def testBadMOCRandom(self):
		self.assertRaisesWithMsg(ValueError,
			"No order separator visible in MOC literal 'Habe nun Philosophie"
				"/Juristerei/Medic...'",
			pgsphere.SMoc.fromASCII,
			("Habe nun Philosophie/Juristerei/Medicin und leider ach Theologie",))

	def testBadMOCJunkAtStart(self):
		self.assertRaisesWithMsg(ValueError,
			"MOC literal 'Habe 1/3,4 3/45,9' does not start with order spec",
			pgsphere.SMoc.fromASCII,
			("Habe 1/3,4 3/45,9",))
	
	def testJunkInCenter(self):
		self.assertRaisesWithMsg(ValueError,
			"MOC literal syntax error at char 4",
			pgsphere.SMoc.fromASCII,
			("1/3, Habe, 4 3/45,9",))

	def testMOCConstruction(self):
		m = pgsphere.SMoc.fromASCII("1/1,3,4 2/4,25,12-14,21")
		self.assertEqual(m.moc[2], frozenset([4, 12, 13, 14, 21, 25]))

	def testFITSConstructor(self):
		m = pgsphere.SMoc.fromFITS(_SAMPLE_FITS_MOC)
		self.assertTrue(14400 in m.moc[6])

	def testToASCII(self):
		m = pgsphere.SMoc.fromASCII("1/1,3,4 2/4,25,12-14,21")
		self.assertEqual(m.asASCII(),
			"1/1,3-4 2/4,12-14,21,25")

	def testMocFromPoly(self):
		m = pgsphere.SPoly.fromDALI([0, 0, 45, 0, 0, 90]).asSMoc(order=4)
#		from pymoc.util.plot import plot_moc
#		res = plot_moc(m.moc, filename="zw.png", projection="moll")
		self.assertEqual(m.asASCII(),
			'1/2 2/2,14,71,77 3/2,14,50,62,279,283,305,317'
			' 4/2,14,50,62,194,206,242,254,1111,1115,1127,1131,1217,1229,1265,1277')
	
	def testMocFromCircle(self):
		m = pgsphere.SCircle.fromDALI([-50, 33, 2]).asPoly().asSMoc()
		self.assertEqual(m.asASCII(),
			'5/3309 6/13233-13235,13240-13241,13243-13246,13280,13282')


class KVLParseTest(testhelpers.VerboseTest):
# Tests for our key-value line format (as in postgres)
	def testNoQuote(self):
		self.assertEqual(utils.parseKVLine("anz=29"), {"anz": "29"})

	def testWhitespaceAroundEqual(self):
		self.assertEqual(utils.parseKVLine("a =29 bo= orz Unt = Lopt"), 
			{"a": "29", "bo": "orz", "Unt": "Lopt"})

	def testQuotedString(self):
		self.assertEqual(utils.parseKVLine(
			"simp='abc' a ='29' bo= 'orz' Unt = 'Lopt'"), 
			{"simp": "abc", "a": "29", "bo": "orz", "Unt": "Lopt"})

	def testWithBlanks(self):
		self.assertEqual(utils.parseKVLine(
			"name='Virtual Astrophysical' a=' 29'"),
			{"name": 'Virtual Astrophysical', "a": ' 29'})

	def testEscaping(self):
		self.assertEqual(utils.parseKVLine(
			r"form='f\'(x) = 2x^3' escChar='\\'"),
			{"form": "f'(x) = 2x^3", "escChar": '\\'})

	def testEmpty(self):
		self.assertEqual(utils.parseKVLine(
			"kram='' prokto=logic"),
			{"kram": "", "prokto": 'logic'})

	def testBadKey(self):
		self.assertRaisesWithMsg(utils.ParseException,
			"Expected Keyword (at char 0), (line:1, col:1)",
			utils.parseKVLine,
			("7ana=kram",))

	def testMissingEqual(self):
		self.assertRaisesWithMsg(utils.ParseException,
			'Expected "=" (at char 7), (line:1, col:8)',
			utils.parseKVLine,
			("yvakram",))

	def testBadValue(self):
		self.assertRaisesWithMsg(utils.ParseException,
			"Expected end of text (at char 7), (line:1, col:8)",
			utils.parseKVLine,
			("borken='novalue",))

	def testTooManyEquals(self):
		self.assertRaisesWithMsg(utils.ParseException,
			'Expected end of text (at char 14), (line:1, col:15)',
			utils.parseKVLine,
			("borken=novalue=ab",))


class KVLMakeTest(testhelpers.VerboseTest):
	def testWithKeywords(self):
		self.assertEqual(utils.makeKVLine({"ab": "cd", "honk": "foo"}),
			"ab=cd honk=foo")
	
	def testWithWeird(self):
		self.assertEqual(utils.makeKVLine({"ab": "c d", "honk": "foo=?"}),
			"ab='c d' honk='foo=?'")
	
	def testWithEscaping(self):
		self.assertEqual(
			utils.makeKVLine({"form": "f'(x) = 2x^3", "escChar": '\\'}),
			"escChar='\\\\' form='f\\'(x) = 2x^3'")
	
	def testFailsWithBadKey(self):
		self.assertRaisesWithMsg(ValueError,
			"'a form' not allowed as a key in key-value lines",
			utils.makeKVLine,
			({"a form": "f'(x) = 2x^3"},))


import grp
from gavo.base import osinter

class OSInterTest(testhelpers.VerboseTest):
	def testMakeSharedDir(self):
		path = os.path.join(base.getConfig("inputsDir"), "_dir_form_unit_test_")
		try:
			osinter.makeSharedDir(path, writable=True)
			stats = os.stat(path)
			self.assertEqual(stats.st_mode&0060, 060)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")

			os.chown(path, -1, os.getgid())
			osinter.makeSharedDir(path, writable=True)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")
		finally:
			os.rmdir(path)

	def testSwitchHTTP(self):
		self.assertEqual(
			osinter.switchProtocol("http://localhost:8080/bar/romp?hook#dork"),
			"https://localhost/bar/romp?hook#dork")

	def testSwitchHTTPS(self):
		self.assertEqual(
			osinter.switchProtocol("https://localhost/bar/romp?hook#dork"),
			"http://localhost:8080/bar/romp?hook#dork")

	def testSwitchOtherURL(self):
		self.assertRaises(ValueError,
			osinter.switchProtocol,
			"http://ivoa.net/documents")

	def testMailFormat(self):
		res = osinter.formatMail(u"""From: "Foo Bar\xdf" <foo@bar.com>
To: gnubbel@somewhere.org
Subject: Test Mail
X-Testing: Yes

This is normal text with shitty characters: '\xdf\xe4i\xdf\xe4'.

Send it, anyway.

Cheers,

       Foo.
""")
		self.assertTrue(isinstance(res, str))
		self.assertTrue("MIME-Version: 1.0" in res)
		self.assertTrue(" characters: '=C3=9F=" in res)
		self.assertTrue("From: =?utf-8?q?=22Foo_Bar=C3=9F=2" in res)
		self.assertTrue("X-Testing: Yes" in res)
		self.assertTrue("Subject: Test Mail" in res)
		self.assertTrue("To: gnubbel@somewhere.org" in res)
		self.assertTrue(re.search("Date: .*GMT", res))


import lxml
from gavo.helpers import testtricks

VALID_OAI = """<?xml-stylesheet href='/static/xsl/oai.xsl' type='text/xsl'?><oai:OAI-PMH xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/ http://vo.ari.uni-heidelberg.de/docs/schemata/OAI-PMH.xsd"><oai:responseDate>2014-07-21T15:13:09Z</oai:responseDate><oai:request verb="ListSets"/><oai:ListSets><oai:set><oai:setSpec>ivo_managed</oai:setSpec><oai:setName>ivo_managed</oai:setName></oai:set></oai:ListSets></oai:OAI-PMH>"""

class ValidatorTest(testhelpers.VerboseTest):
	def testSimpleValidator(self):
		val = testtricks.getJointValidator(["oai_dc.xsd", "OAI-PMH.xsd"])
		oaiTree = lxml.etree.fromstring(VALID_OAI)
		val.assertValid(oaiTree)

		lxml.etree.SubElement(oaiTree[2][0], "p")
		self.assertFalse(val(oaiTree))


class PgValidatorTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testBasicAcceptance(self):
		base.sqltypeToPgValidator("integer")("int4")
	
	def testBadRejecting(self):
		self.assertRaisesWithMsg(base.ConversionError,
			"No Postgres pg_types validators type for float",
			base.sqltypeToPgValidator,
			("float",))

	def testBasicRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"int4 is not compatible with a float column",
			base.sqltypeToPgValidator("real"),
			("int4",))

	def testArrayAcceptance(self):
		base.sqltypeToPgValidator("real[]")("_float8")

	def testNondbRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"Column with a non-db type file mapped to db type wurst",
			base.sqltypeToPgValidator("file"),
			("wurst",))

	def testPgSphereOk(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="test"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		table = api.TableForDef(rd.tables[0], connection=self.conn, create=True)
		try:
			table.ensureOnDiskMatches()  # raises DataError if test fails
		finally:
			self.conn.rollback()
			table.drop()

	def testPgSphereRaises(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="public"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		self.conn.execute("create table valtest (x integer)")
		table = api.TableForDef(rd.tables[0], connection=self.conn)
		try:
			self.assertRaisesWithMsg(api.DataError,
				"Table public.valtest: type mismatch in column x (Incompatible"
				" type in DB: Expected spoint, found int4)",
				table.ensureOnDiskMatches,
				())
		finally:
			self.conn.rollback()
			table.drop()
	

from gavo.user import validation

class GavoTableValTest(testhelpers.VerboseTest):
	class defaultArgs:
		compareDB = False

	def _getRdWithTable(self, columns):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="__test"><table id="totest" onDisk="True">
			%s</table></resource>"""%columns)
		rd.sourceId = "testing/q"
		return rd
	
	def _getValFuncOutput(self, func, rd):
		return testhelpers.captureOutput(func, (rd, self.defaultArgs))[1]

	def _getMessagesForColumns(self, columns):
		rd = self._getRdWithTable(columns)
		return self._getValFuncOutput(validation.validateTables, rd)

	def testReservedName(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='distance'/>"),
			"[WARNING] testing/q: Column totest.distance: Name is not a regular ADQL identifier.\n")

	def testDelimitedId(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='quoted/distance'/>"),
			"")


from gavo.web import examplesrender

class RSTExtensionTest(testhelpers.VerboseTest):
	def testWorkingExamples(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"](
				"Here is a :genparam:`par(example, no?)` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue('<span property="generic-parameter" typeof="keyval"'
			in res)
		self.assertTrue('<span property="key" class="genparam-key">par</span>'
			in res)
		self.assertTrue('<span property="value" class="genparam-value">'
			'example, no?</span>' in res)
	
	def testFailingExample(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"]("Here is a :genparam:`parfoo` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue("parfoo does not" in res)
		self.assertTrue('<span class="problematic"' in res)


from gavo.user import rdmanipulator

XML_SAMPLE = """
<?xml version="1.0"?>
<!-- opening comment -->

<root>
<weird:element-has_name att="bla's attribute"

	attB='
lots

  of whitespace'/>
  <![CDATA[
  <lots of messy></stuff>
  <p><em>ignored</em></p>
  ]]>
	<p style="foo:bar" class="upper"
		>  There is 
		<em>more</em> stuff	after the tab</p>
		<em>lonely m</em>
</root>
<!-- final comment -->
"""

class RDManiTest(testhelpers.VerboseTest):
	resources = [("ssaTable", tresc.ssaTestTable)]

	def testTransparent(self):
		self.assertEqual(XML_SAMPLE, rdmanipulator.processXML(
			XML_SAMPLE, rdmanipulator.Manipulator()))
	
	def testManipulation(self):
		class Manipulator(rdmanipulator.Manipulator):
			def __init__(self):
				self.sharp = False
				rdmanipulator.Manipulator.__init__(self)

			def gotElement(self, parseResult):
				if parseResult[0][1]=="p":
					parseResult = parseResult
					parseResult[0][2:2] = [" manipulated='True'"]

				elif parseResult[0][1]=="em":
					if self.hasParent("p"):
						parseResult = ["<em>much more", "</em>"]

				return parseResult

		res = rdmanipulator.processXML(XML_SAMPLE, Manipulator())
		self.assertTrue("\n\t<p manipulated='True' style=\"foo" in res,
			"manipulated missing")
		self.assertTrue("\t\t<em>much more</em>" in res,
			"em content not replaced")
		self.assertTrue("\t<em>lonely m</em>" in res,
			"lonely em replaced")
		self.assertTrue('<em>ignored</em>' in res)

	def testGetChanges(self):
		self.assertEqual(list(rdmanipulator.iterLimitsForTable(
			self.ssaTable.tableDef)), [
				('nrows', (u'hcdtest',  6)),
				('limits', (u'hcdtest', u'accsize', 213, 225)), 
				('limits', (u'hcdtest', u'ssa_redshift', -0.001, 0.7)),
				('limits', (u'hcdtest', u'ssa_timeExt', None, None)),
				('limits', (u'hcdtest', u'bogosity', None, None)),
			])

	def testGetChangedRD(self):
		res = rdmanipulator.getChangedRD(self.ssaTable.tableDef.rd.sourceId,
			rdmanipulator.iterLimitsForTable(
				self.ssaTable.tableDef))
		self.assertTrue('\t<column name="excellence" type="integer"'
			' description="random number">\n\t\t\t'
			'<values nullLiteral="-1"/>\n\t\t</column>'
			'\n\t\t<column original="accsize">\n\t\t\t'
			'<values \n\t\t\t\tmin="213" \n\t\t\t\tmax="225" nullLiteral="-1"/>'
			'\n\t\t</column>\n\t\t'
			'<column original="ssa_redshift">\n\t\t\t'
			'<values \n\t\t\t\tmin="-0.001" max="0.7"/>\n\t\t</column>\n'
			'\t\t<column original="ssa_timeExt">\n'
			'\t\t\t<values     max="0"     min="0"/>\n\t\t</column>\n'
			in res)

	def testFillingReplace(self):
		val = rdmanipulator.processXML(
			"<coverage><spatial>X</spatial></coverage>",
			rdmanipulator._ValuesChanger([
				("coverage", (None, "spatial", "0/1-2"))]))
		self.assertEqual(val,
			"<coverage><spatial>0/1-2</spatial></coverage>")

	def testFillingFill(self):
		val = rdmanipulator.processXML(
			"<coverage><spatial></spatial></coverage>",
			rdmanipulator._ValuesChanger([
				("coverage", (None, "spatial", "0/1-2"))]))
		self.assertEqual(val,
			"<coverage><spatial>0/1-2</spatial></coverage>")

	def testFillingEmpty(self):
		val = rdmanipulator.processXML(
			"<coverage><spatial/></coverage>",
			rdmanipulator._ValuesChanger([
				("coverage", (None, "spatial", "0/1-2"))]))
		self.assertEqual(val,
			"<coverage><spatial>0/1-2</spatial></coverage>")

	def testRefusingMultiCoverageUpdate(self):
		self.assertRaisesWithMsg(base.ReportableError,
			"Cannot replace coverage for axis 'spectral':"
			" unsupported previous content.",
			rdmanipulator.processXML,
			("<coverage><spectral>4 5</spectral>"
			"<spectral>5e-7 6e-7</spectral></coverage>",
			rdmanipulator._ValuesChanger([
				("coverage", (None, "spectral", "6e-7 4"))])))


def _getUpgraders():
	class To10Upgrader(upgrade.Upgrader):
		version = 9
		s_010_foo = "CREATE TABLE foo"
		u_010_fill_foo = "INSERT INTO foo"

	class To11Upgrader(upgrade.Upgrader):
		version = 10

		s_010_bar = "CREATE TABLE bar"

		u_010_fill_bar = "INSERT INTO bar"

		u_005_register_bar = "INSERT INTO metastore"
	
	class To20Upgrader(upgrade.Upgrader):
		version = 19

		s_005_addheck = "ALTER TABLE bar ADD COLUMN quux"
		s_010_addheck = "ALTER TABLE bar ADD COLUMN honk"
	
	return locals().values()


class UpgraderTest(testhelpers.VerboseTest):
	def testSchemachangeSort(self):
		statements = list(upgrade.iterStatements(9, 11, _getUpgraders()))
		self.assertEqual(statements[0], "CREATE TABLE foo")
		self.assertTrue(statements[2] is upgrade._COMMIT)
		self.assertEqual(statements[3], 'INSERT INTO foo')
		updateSchemaversion = statements[4]
		self.assertTrue("To10Upgrader" in repr(updateSchemaversion.im_self))
		self.assertEqual(len(statements), 10)

	def testHoleCovered(self):
		statements = list(upgrade.iterStatements(10, 20, _getUpgraders()))
		self.assertEqual(statements[2], "ALTER TABLE bar ADD COLUMN honk")
		self.assertTrue(statements[3] is upgrade._COMMIT)
		self.assertTrue("To20Upgrader" in repr(statements[-2].im_self))


if __name__=="__main__":
	testhelpers.main(KVLMakeTest)
