"""
Some unit tests not yet fitting anywhere else.
"""

#c Copyright 2008-2021, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import http.client
import io
import os
import re
import shutil
import sys
import tempfile
import unittest
from urllib import parse

import numpy

from gavo.helpers import testhelpers

from gavo import api
from gavo import base
from gavo import rscdesc
from gavo import votable
from gavo import stc
from gavo import utils
from gavo.user import upgrade
from gavo.helpers import filestuff
from gavo.helpers import processing
from gavo.utils import DEG
from gavo.utils import pyfits
from gavo.utils import pgsphere
from gavo.votable import tapquery

import tresc


class RenamerDryTest(unittest.TestCase):
	"""tests for some aspects of the file renamer without touching the file system.
	"""
	def testSerialization(self):
		"""tests for correct serialization of clobbering renames.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', '2': '3', '1': '2'}
		self.assertEqual(f.makeRenameProc(fileMap),
			[('2', '3'), ('1', '2'), ('b', 'c'), ('a', 'b')])
	
	def testCycleDetection(self):
		"""tests for cycle detection in renaming recipies.
		"""
		f = filestuff.FileRenamer({})
		fileMap = {'a': 'b', 'b': 'c', 'c': 'a'}
		self.assertRaises(filestuff.Error, f.makeRenameProc, fileMap)


class RenamerWetTest(unittest.TestCase):
	"""tests for behaviour of the file renamer on the file system.
	"""
	def setUp(self):
		def touch(name):
			f = open(name, "w")
			f.close()
		self.testDir = tempfile.mkdtemp("testrun")
		for fName in ["a.fits", "a.txt", "b.txt", "b.jpeg", "foo"]:
			touch(os.path.join(self.testDir, fName))

	def tearDown(self):
		shutil.rmtree(self.testDir, onerror=lambda exc: None)

	def testOperation(self):
		"""tests an almost-realistic application
		"""
		f = filestuff.FileRenamer.loadFromFile(
			io.StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		found = set(os.listdir(self.testDir))
		expected = set(["b.fits", "b.txt", "c.txt", "c.jpeg", "bar"])
		self.assertEqual(found, expected)
	
	def testNoClobber(self):
		"""tests for effects of repeated application.
		"""
		f = filestuff.FileRenamer.loadFromFile(
			io.StringIO("a->b \nb->c\n 2->3\n1 ->2\n\n# a comment\n"
				"foo-> bar\n"))
		f.renameInPath(self.testDir)
		self.assertRaises(filestuff.Error, f.renameInPath, self.testDir)


class TimeCalcTest(testhelpers.VerboseTest):
	"""tests for time transformations.
	"""
	def testJYears(self):
		self.assertEqual(stc.jYearToDateTime(1991.25),
			datetime.datetime(1991, 0o4, 0o2, 13, 30, 00))
		self.assertEqual(stc.jYearToDateTime(2005.0),
			datetime.datetime(2004, 12, 31, 18, 0))
	
	def testRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(2010+yr/1000., stc.dateTimeToJYear(
				stc.jYearToDateTime(2010+yr/1000.)), 7,
				"Botched %f"%(2010+yr/1000.))

	def testBYears(self):
		self.assertEqual(stc.bYearToDateTime(1950.0),
			datetime.datetime(1949, 12, 31, 22, 9, 46, 861900))
	
	def testBRoundtrip(self):
		for yr in range(2000):
			self.assertAlmostEqual(1950+yr/1000., stc.dateTimeToBYear(
				stc.bYearToDateTime(1950+yr/1000.)), 7,
				"Botched %f"%(1950+yr/1000.))

	def testBesselLieske(self):
		"""check examples from Lieske, A&A 73, 282.
		"""
		for bessel, julian in [
				(1899.999142, 1900),
				(1900., 1900.000858),
				(1950., 1949.999790),
				(1950.000210, 1950.0),
				(2000.0, 1999.998722),
				(2000.001278, 2000.0)]:
			self.assertAlmostEqual(stc.dateTimeToJYear(stc.bYearToDateTime(bessel)),
				julian, places=5)

	def testJDDateTime(self):
		self.assertEqual(utils.formatISODT(stc.jdnToDateTime(2458383.5)),
			"2018-09-22T00:00:00Z")


class FITSProcessorTest(testhelpers.VerboseTest):
	"""tests for some aspects of helper file processors.

	Unfortunately, sequencing is important here, so we do it all in
	one test.  I guess one should rethink things here, but for now let's
	keep things simple.
	"""
	_rdText = """<resource schema="filetest"><data id="import">
		<sources pattern="*.fits"/><fitsProdGrammar/>
		</data></resource>"""

	def _writeFITS(self, destPath, seed):
		hdu = pyfits.PrimaryHDU(numpy.zeros((2,seed+1), 'i2'))
		hdu.header.set("SEED", seed, "initial number")
		hdu.header.set("WEIRD", "W"*seed)
		hdu.header.set("RECIP", 1./(1+seed))
		hdu.writeto(destPath)

	def setUp(self):
		self.resdir = os.path.join(base.getConfig("tempDir"), "filetest")
		self.origInputs = base.getConfig("inputsDir")
		base.setConfig("inputsDir", base.getConfig("tempDir"))
		if os.path.exists(self.resdir): # Leftover from previous run?
			return
		os.mkdir(self.resdir)
		for i in range(10):
			self._writeFITS(os.path.join(self.resdir, "src%d.fits"%i), i)
		f = open(os.path.join(self.resdir, "filetest.rd"), "w")
		f.write(self._rdText)
		f.close()

	def tearDown(self):
		base.setConfig("inputsDir", self.origInputs)
		shutil.rmtree(self.resdir, True)

	class SimpleProcessor(processing.HeaderProcessor):
		def __init__(self, *args, **kwargs):
			processing.HeaderProcessor.__init__(self, *args, **kwargs)
			self.headersBuilt = 0

		def _isProcessed(self, srcName):
			return "SQUARE" in self.getPrimaryHeader(srcName)

		def _getHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**2)
			self.headersBuilt += 1
			return hdr

	def _getHeader(self, srcName):
		hdus = pyfits.open(os.path.join(self.resdir, srcName))
		hdr = hdus[0].header
		hdus.close()
		return hdr

	def _testPlainRun(self):
		# procmain reads argv, don't confuse it
		sys.argv = ["test", "--bail"]
		# Normal run, no headers present yet
		proc, stdout, errs = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(errs, "")
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 10)
		self.assertTrue(os.path.exists(
			os.path.join(self.resdir, "src9.fits.hdr")))
		self.assertTrue("SQUARE" in self._getHeader("src9.fits.hdr"))
		# we don't run with applyHeaders here
		self.assertFalse("SQUARE" in self._getHeader("src9.fits"))

	def _testRespectCaches(self):
		"""tests that no processing is done when cached headers are there.

		This needs to run after _testPlainRun.
		"""
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
	
	def _testNoCompute(self):
		"""tests that no computations take place with --no-compute.
		"""
		sys.argv = ["misctest.py", "--no-compute"]
		os.unlink(os.path.join(self.resdir, "src4.fits.hdr"))
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 0)

	def _testRecompute(self):
		"""tests that missing headers are recomputed.

		This needs to run before _testApplyCaches and after _testNoCompute.
		"""
		sys.argv = ["misctest.py"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 1)

	def _testApplyCaches(self):
		"""tests the application of headers to sources.

		This needs to run after _testPlainRun
		"""
		sys.argv = ["misctest.py", "--apply"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(stdout.split('\r')[-1].strip(), 
			"10 files processed, 0 files with errors")
		self.assertEqual(proc.headersBuilt, 0)
		self.assertTrue("SQUARE" in self._getHeader("src9.fits"))
		# see if data survived
		hdus = pyfits.open(os.path.join(self.resdir, "src9.fits"))
		na = hdus[0].data
		self.assertEqual(na.shape, (2, 10))
	
	def _testForcedRecompute(self):
		"""tests for working --reprocess.
		"""
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(proc.headersBuilt, 10)

	def _testBugfix(self):
		"""tests for working --reprocess --apply.

		This must run last since we're monkeypatching SimpleProcessor.
		"""
		def newGetHeader(self, srcName):
			hdr = self.getPrimaryHeader(srcName)
			hdr.set("SQUARE", hdr["SEED"]**3)
			self.headersBuilt += 1
			return hdr
		sys.argv = ["misctest.py", "--reprocess", "--apply"]
		self.SimpleProcessor._getHeader = newGetHeader
		sys.argv = ["misctest.py", "--reprocess"]
		proc, stdout, _ = testhelpers.captureOutput(processing.procmain,
			(self.SimpleProcessor, "filetest/filetest", "import"))
		self.assertEqual(self._getHeader("src6.fits.hdr")["SQUARE"], 216)

	def testAll(self):
		self._testPlainRun()
		self._testRespectCaches()
		self._testNoCompute()
		self._testRecompute()
		self._testForcedRecompute()
		self._testApplyCaches()
		self._testForcedRecompute()
		self._testBugfix()


class SpectrumPreviewTest(testhelpers.VerboseTest):
	def testBuildSpectrum(self):
		destPath = os.path.join(
			api.getConfig("inputsDir"),
			"ssa-nopreviews",
			"fake-preview")
		if os.path.exists(destPath):
			os.unlink(destPath)
			
		class PreviewMaker(api.SpectralPreviewMaker):
			sdmId = "datamaker"

			def iterIdentifiers(self):
				yield "this is not actually an identifer, just unit test stuff"

			def getPreviewPath(self, accref):
				return destPath

		class opts:
			nParallel = 1
			doReport = False
			force = True
			bailOnError = True
			requireFrag = None

		proc = PreviewMaker(opts, api.resolveCrossId("data/ssatest#test_import"))
		rtval, stdout, stderr = testhelpers.captureOutput(proc.processAll)

		self.assertEqual(rtval, (1, 0))
		self.assertEqual(stderr, "")
		
		with open(destPath, "rb") as f:
			prevData = f.read()

		self.assertTrue(b"IDATx\x9c\xed\xda\xbb\r\x83@\x10\x84" in prevData,
			"Generated spectrum preview doesn't contain expected data")


@contextlib.contextmanager
def _fakeHTTPLib(respData="", respStatus=200, 
		mime="application/x-votable", exception=None):
	"""runs a test with a fake httplib connection maker.

	This is for TapquerySyncTest and similar.
	"""
	class FakeResult(object):
		status = respStatus

		def getheader(self, key):
			if key.lower()=='content-type':
				return mime
			else:
				self.fail()

		def read(self):
			return respData

	class FakeInfo(object):
		pass

	class FakeConnection(object):
		def __init__(self, *args, **kwargs):
			pass

		def request(self, method, path, data, headers):
			FakeInfo.lastData = data
			if exception is not None:
				raise exception

		def getresponse(self, *args, **kwargs):
			return FakeResult()

		def close(self):
			pass
	
	origConn = http.client.HTTPConnection
	http.client.HTTPConnection = FakeConnection
	try:
		yield FakeInfo
	finally:
		http.client.HTTPConnection = origConn


class TapquerySyncTest(testhelpers.VerboseTest):
# Tests for the tapquery sync object; since TAP queries are expensive,
# we only test things that don't actually run a query.  For more extensive
# exercising, see the taptest RD at the GAVO DC.
	endpoint = "http://dachstest"

	def testNoResult(self):
		job = votable.ADQLSyncJob(self.endpoint, 
			"select * from tap_schema.tables")
		self.assertRaisesWithMsg(tapquery.Error,
			"No result in so far",
			job.openResult,
			())

	def testWrongStatus(self):
		with _fakeHTTPLib(respData=b"oops", respStatus=404):
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(), "oops")

	def testHTTPError(self):
		import socket
		with _fakeHTTPLib(respData="oops", exception=socket.error("timeout")):
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			self.assertRaises(tapquery.NetworkError, job.start)
			self.assertEqual(job.getErrorFromServer(), 
				'Problem connecting to dachstest (timeout)')

	def testTAPError(self):
		with _fakeHTTPLib(respData="""<VOTABLE version="1.2" xmlns:vot="http://www.ivoa.net/xml/VOTable/v1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.ivoa.net/xml/VOTable/v1.2 http://vo.ari.uni-heidelberg.de/docs/schemata/VOTable-1.2.xsd"><RESOURCE type="results"><INFO name="QUERY_STATUS" value="ERROR">Could not parse your query: Expected "SELECT" (at char 0), (line:1, col:1)</INFO></RESOURCE></VOTABLE>""", respStatus=400):
			job = votable.ADQLSyncJob(self.endpoint, 
				"selct * from tap_schema.tables")
			self.assertRaises(tapquery.WrongStatus, job.start)
			self.assertEqual(job.getErrorFromServer(), 
				'Could not parse your query: Expected "SELECT" (at char 0),'
				' (line:1, col:1)')

	def testConstructionParameters(self):
		with _fakeHTTPLib(respData=b"ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables", userParams={"MAXREC": 0})
			job.start()
			self.assertEqual(parse.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])
		
	def testLaterParameters(self):
		with _fakeHTTPLib(respData=b"ok") as fakeInfo:
			job = votable.ADQLSyncJob(self.endpoint, 
				"select * from tap_schema.tables")
			job.setParameter("MAXREC", 0)
			job.start()
			self.assertEqual(parse.parse_qs(fakeInfo.lastData)["MAXREC"], ["0"])


# this is http://alasky.u-strasbg.fr/footprints/cats/vizier/I/206?product=MOC&nside=64
_SAMPLE_FITS_MOC = utils.getDirtyBytes("""
eJztl2tMW2Ucxh810WhijNH4QY05iRq2qEOI2eYFI20PUGxPO9rKZbqtQGFFKKNQKAiJTjeNt206
3eYE4m2aZYOp87YpxGxeolHHFplm4gSdGjM1Xpbsg7dfwUQyKQRcjInnSX4f+p7zPu//9ianAbfX
7zENI8eYQEEj0yiOR5uaIjGjvNXw+pxGTbg5bOT63cYs/5y8SDwWbUhEZo/b4nAH/e6SNH7zJ1qc
XFZuiTtgpPG7fPp+Bf6SVBoT+80l3wIz1+OPJg1vorYp6otXRuKGs745Eg9XR4y68DJjVl04adSn
1kfTNkuCpuVKW79pa9TMli1btmzZsmXL1n9Wqc+/gNtn5RgZDrcVzHV4zIxxj//6nqwY/xGZXv/q
93P2DP2y0vhdMUO/7An9sudlTd/P7/SFrODx+7+QP5nfDOIL5rlNjytwHP18Rd7RfmRkFY4tZUy+
Ywq/YKnfHPNjDk3PMX6ZBrMcqU1NdWqoQ5Z7gRFL1JXzL2li+RyBxRZOafqbaVhju+urjGUp58Yp
4vMVucwit5VPfNbo6cfG543Eq6OxaoNb6TKNcKzSsMxA0HSN3qq/y+nDMVAawM/559I/qp/9/82W
LVu2bNmaWlJuBwzAIPwmOR6B/TAE30rO+bAe+mEXsO76QDJdcAdsga+kvCfhRSm/EX6XCrzwPnwv
uQUnAD7ul6VC3ilcAc/ATnhNuv5CuAzmQjWsgSdgk+Q5BRZIXt717pGs86AQFkE3DEv+TGnBfVIR
eQROhbNhHXwIn0vB06UQMYTYFyLn0CvwnnTDlbAcboeH4Cep+CQgxuJ7pRLyKr0HOqUy8ijDu2wO
cF7ZVXBYWpgNxHzjbGnRPGnx+dKSr6Xy76QK8qngWcXbUuVZEIYH4CUgtsoRKXIBmFJVBuBdRR+q
24C6LvVDMfwgRa8B6h7leQ0x1bwj1Z4LO6RYr1S/AX6UGsivgXfi10mNUdgI9KbxVdgLv0hN1C5x
GjADCeJPXA0LgbonHgN6k3gW3gTmI0GszXfBC4BXC/taOK/lGylJLZKclaRASTySz0utF8GlwLNW
4m7NAfrQRg/azgBybcuDh+Fx6ZZrYbfUTq/byam9DlYDzzroXQe97eB3x7vws3RrjbT8JqAuK1qk
lazfH5NWOeCotPp1aQ3xPJgAZnftiXCOtH67tIFzNxLLoyGpk3p3srdrJTBvXcxPNz3rrgBmq5s+
dRPXJuJ7ulXazH3YTF+39Ehb6dHW26QeatBDTXsZ9F7mddsb0nPksJ0a7Vgi7WSe+si5j/vS96vU
T136iau/S9pFb3bfLb3FndpzRBpYCsztAHM3QP33OqV95L8vDvR2EN9Bct9fKX3E748vAeb0AHfr
AOd+Qq5D9HyImf70YlgL1PAg+R3kLn7GO8PlgMcwMz28TRo5GejTSBAiwF0YuROIa+Qp6YtVcEg6
dLN0+EzpyJfSUWK3ZcuWLVu2/p/6A9x9I/E=""")


class PgSphereDryTest(testhelpers.VerboseTest):
# Tests for pgsphere interface that don't need DB connectivity
# (others are in dbtest)
	def _assertCircleBecomesPolygon(self, alpha, delta, radius):
		alpha, delta, radius = alpha*DEG, delta*DEG, radius*DEG
		c = pgsphere.SCircle(pgsphere.SPoint(alpha, delta), radius)
		centerVec = utils.spherToCart(alpha, delta)
		poly = c.asPoly()
		for pt in poly.points:
			circleVec = utils.spherToCart(pt.x, pt.y)
			self.assertAlmostEqual(utils.spherDist(circleVec, centerVec), radius)
	
	def testCircle1AsPoly(self):
		self._assertCircleBecomesPolygon(0, 90, 3)

	def testCircle2AsPoly(self):
		self._assertCircleBecomesPolygon(0, 0, 8)

	def testCircle3AsPoly(self):
		self._assertCircleBecomesPolygon(80, -10, 20)

	def testCircle4AsPoly(self):
		self._assertCircleBecomesPolygon(120, -80, 1)

	def testCircle5AsPoly(self):
		self._assertCircleBecomesPolygon(220, -45, 1)

	def testCircle6AsPoly(self):
		self._assertCircleBecomesPolygon(320, 45, 90)

	def testPolyAsPoly(self):
		p = pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((0.5, 0.4), (1, -0.2), (1.5, 0))])
		self.assertTrue(p.asPoly() is p)
	
	def testNormSboxAsPoly(self):
		b = pgsphere.SBox(pgsphere.SPoint(0.2, -0.5), pgsphere.SPoint(2, 0.1))
		self.assertEqual(b.asPoly(),
			pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((0.2, -0.5), (0.2, 0.1), (2, 0.1), (2, -0.5))]))

	def testInvSboxAsPoly(self):
		b = pgsphere.SBox(pgsphere.SPoint(2, 0.1), pgsphere.SPoint(-0.1, -0.5))
		self.assertEqual(b.asPoly(),
			pgsphere.SPoly([pgsphere.SPoint(*p) for p in
			((-0.1, -0.5), (-0.1, 0.1), (2, 0.1), (2, -0.5))]))

	def testBadMOCRandom(self):
		self.assertRaisesWithMsg(ValueError,
			"No order separator visible in MOC literal 'Habe nun Philosophie"
				"/Juristerei/Medic...'",
			pgsphere.SMoc.fromASCII,
			("Habe nun Philosophie/Juristerei/Medicin und leider ach Theologie",))

	def testBadMOCJunkAtStart(self):
		self.assertRaisesWithMsg(ValueError,
			"MOC literal 'Habe 1/3,4 3/45,9' does not start with order spec",
			pgsphere.SMoc.fromASCII,
			("Habe 1/3,4 3/45,9",))
	
	def testJunkInCenter(self):
		self.assertRaisesWithMsg(ValueError,
			"MOC literal syntax error at char 4",
			pgsphere.SMoc.fromASCII,
			("1/3, Habe, 4 3/45,9",))

	def testMOCConstruction(self):
		m = pgsphere.SMoc.fromASCII("1/1\n3 4 2/4 25 12-14 21")
		self.assertEqual(m.moc[2], frozenset([4, 12, 13, 14, 21, 25]))

	def testFITSConstructor(self):
		m = pgsphere.SMoc.fromFITS(_SAMPLE_FITS_MOC)
		self.assertTrue(14400 in m.moc[6])

	def testToASCII(self):
		m = pgsphere.SMoc.fromASCII("1/1,3,4 2/4,25,12-14,21")
		self.assertEqual(m.asASCII(),
			"1/1 3-4 2/4 12-14 21 25")

	def testMocFromPoly(self):
		m = pgsphere.SPoly.fromDALI([0, 0, 45, 0, 0, 90]).asSMoc(order=4)
		self.assertEqual(m.asASCII(),
			'1/2 2/2 14 71 77 3/2 14 50 62 279 283 305 317'
			' 4/2 14 50 62 194 206 242 254 1111 1115 1127 1131 1217 1229 1265 1277')
	
	def testMocFromCircle(self):
		m = pgsphere.SCircle.fromDALI([-50, 33, 2]).asPoly().asSMoc()
		self.assertEqual(m.asASCII(),
			'5/3309 6/13233-13235 13240-13241 13243-13246 13280 13282')

	def testMocFromCircleInclusive(self):
		m = pgsphere.SCircle.fromDALI([-50, 33, 2]).asPoly().asSMoc(inclusive=True)
#		from pymoc.util.plot import plot_moc
#		res = plot_moc(m.moc, filename="zw.png", projection="moll")
		self.assertEqual(m.asASCII(),
			'4/827 6/13211 13214-13215 13258 13280 13282 13288 13290 14609 14612')

	def testShortPolyDALIConstructor(self):
		self.assertRaisesWithMsg(ValueError,
			"Need an even-numbered number (>=6) of floats in a DALI polygon represenation, got 4 floats.",
			pgsphere.SPoly.fromDALI,
			([1, 1, 2, 2],))

	def testOddPolyDALIConstructor(self):
		self.assertRaisesWithMsg(ValueError,
			"Need an even-numbered number (>=6) of floats in a DALI polygon represenation, got 7 floats.",
			pgsphere.SPoly.fromDALI,
			([-1.5372606267401516, 85.26415832196923, -1.5372606267401516, 85.25833260227361, -1.5401725504711685, 85.25541973647518, -1.5416285063290134],))

	def testCoveringCircle(self):
		points = [pgsphere.SPoint.fromDegrees(*v) for v in [
			(2, 3), (355, -9), (0, 0)]]
		c = pgsphere.SCircle.fromPointSet(points)
		self.assertEqual(c.asSTCS("X"), 
			"Circle X 359.0157337607 -1.9990620034 6.2881158141")

	def testCircleSMoc(self):
		c = pgsphere.SCircle.fromDALI([250, -79, 0.25])
		self.assertEqual(
			c.asSMoc(order=7).asSTCS("X"),
			"MOC 7/164157")


class KVLParseTest(testhelpers.VerboseTest):
# Tests for our key-value line format (as in postgres)
	def testNoQuote(self):
		self.assertEqual(utils.parseKVLine("anz=29"), {"anz": "29"})

	def testWhitespaceAroundEqual(self):
		self.assertEqual(utils.parseKVLine("a =29 bo= orz Unt = Lopt"), 
			{"a": "29", "bo": "orz", "Unt": "Lopt"})

	def testQuotedString(self):
		self.assertEqual(utils.parseKVLine(
			"simp='abc' a ='29' bo= 'orz' Unt = 'Lopt'"), 
			{"simp": "abc", "a": "29", "bo": "orz", "Unt": "Lopt"})

	def testWithBlanks(self):
		self.assertEqual(utils.parseKVLine(
			"name='Virtual Astrophysical' a=' 29'"),
			{"name": 'Virtual Astrophysical', "a": ' 29'})

	def testEscaping(self):
		self.assertEqual(utils.parseKVLine(
			r"form='f\'(x) = 2x^3' escChar='\\'"),
			{"form": "f'(x) = 2x^3", "escChar": '\\'})

	def testEmpty(self):
		self.assertEqual(utils.parseKVLine(
			"kram='' prokto=logic"),
			{"kram": "", "prokto": 'logic'})

	def testBadKey(self):
		self.assertRaisesWithMsg(utils.ParseException,
			"Expected Keyword, found '7'  (at char 0), (line:1, col:1)",
			utils.parseKVLine,
			("7ana=kram",))

	def testMissingEqual(self):
		self.assertRaisesWithMsg(utils.ParseException,
			'Expected "=", found end of text  (at char 7), (line:1, col:8)',
			utils.parseKVLine,
			("yvakram",))

	def testBadValue(self):
		self.assertRaisesWithMsg(utils.ParseException,
			"Expected end of text, found \"'\"  (at char 7), (line:1, col:8)",
			utils.parseKVLine,
			("borken='novalue",))

	def testTooManyEquals(self):
		self.assertRaisesWithMsg(utils.ParseException,
			'Expected end of text, found \'=\'  (at char 14), (line:1, col:15)',
			utils.parseKVLine,
			("borken=novalue=ab",))


class KVLMakeTest(testhelpers.VerboseTest):
	def testWithKeywords(self):
		self.assertEqual(utils.makeKVLine({"ab": "cd", "honk": "foo"}),
			"ab=cd honk=foo")
	
	def testWithWeird(self):
		self.assertEqual(utils.makeKVLine({"ab": "c d", "honk": "foo=?"}),
			"ab='c d' honk='foo=?'")
	
	def testWithEscaping(self):
		self.assertEqual(
			utils.makeKVLine({"form": "f'(x) = 2x^3", "escChar": '\\'}),
			"escChar='\\\\' form='f\\'(x) = 2x^3'")
	
	def testFailsWithBadKey(self):
		self.assertRaisesWithMsg(ValueError,
			"'a form' not allowed as a key in key-value lines",
			utils.makeKVLine,
			({"a form": "f'(x) = 2x^3"},))


import grp
from gavo.base import osinter

class OSInterTest(testhelpers.VerboseTest):
	def testMakeSharedDir(self):
		path = os.path.join(base.getConfig("inputsDir"), "_dir_form_unit_test_")
		try:
			osinter.makeSharedDir(path, writable=True)
			stats = os.stat(path)
			self.assertEqual(stats.st_mode&0o060, 0o60)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")

			os.chown(path, -1, os.getgid())
			osinter.makeSharedDir(path, writable=True)
			self.assertEqual(grp.getgrgid(stats.st_gid).gr_name, "gavo")
		finally:
			os.rmdir(path)

	def testSwitchHTTP(self):
		self.assertEqual(
			osinter.switchProtocol("http://localhost:8080/bar/romp?hook#dork"),
			"https://localhost/bar/romp?hook#dork")

	def testSwitchHTTPS(self):
		self.assertEqual(
			osinter.switchProtocol("https://localhost/bar/romp?hook#dork"),
			"http://localhost:8080/bar/romp?hook#dork")

	def testSwitchOtherURL(self):
		self.assertRaises(ValueError,
			osinter.switchProtocol,
			"http://ivoa.net/documents")

	def testMailFormat(self):
		res = osinter.formatMail("""From: "Foo Bar\xdf" <foo@bar.com>
To: gnubbel@somewhere.org
Subject: Test Mail
X-Testing: Yes

This is normal text with shitty characters: '\xdf\xe4i\xdf\xe4'.

Send it, anyway.

Cheers,

       Foo.
""")
		self.assertTrue(isinstance(res, str))
		self.assertTrue("MIME-Version: 1.0" in res)
		self.assertTrue(" characters: '=C3=9F=" in res)
		self.assertTrue("From: =?utf-8?q?=22Foo_Bar=C3=9F=2" in res)
		self.assertTrue("X-Testing: Yes" in res)
		self.assertTrue("Subject: Test Mail" in res)
		self.assertTrue("To: gnubbel@somewhere.org" in res)
		self.assertTrue(re.search("Date: .*GMT", res))


import lxml
from gavo.helpers import testtricks

VALID_OAI = """<?xml-stylesheet href='/static/xsl/oai.xsl' type='text/xsl'?><oai:OAI-PMH xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/ http://vo.ari.uni-heidelberg.de/docs/schemata/OAI-PMH.xsd"><oai:responseDate>2014-07-21T15:13:09Z</oai:responseDate><oai:request verb="ListSets"/><oai:ListSets><oai:set><oai:setSpec>ivo_managed</oai:setSpec><oai:setName>ivo_managed</oai:setName></oai:set></oai:ListSets></oai:OAI-PMH>"""

class ValidatorTest(testhelpers.VerboseTest):
	def testSimpleValidator(self):
		val = testtricks.getJointValidator(["oai_dc.xsd", "OAI-PMH.xsd"])
		oaiTree = lxml.etree.fromstring(VALID_OAI)
		val.assertValid(oaiTree)

		lxml.etree.SubElement(oaiTree[2][0], "p")
		self.assertFalse(val(oaiTree))


class PgValidatorTest(testhelpers.VerboseTest):
	resources = [("conn", tresc.dbConnection)]

	def testBasicAcceptance(self):
		base.sqltypeToPgValidator("integer")("int4")
	
	def testBadRejecting(self):
		self.assertRaisesWithMsg(base.ConversionError,
			"No Postgres pg_types validators type for float",
			base.sqltypeToPgValidator,
			("float",))

	def testBasicRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"int4 is not compatible with a float column",
			base.sqltypeToPgValidator("real"),
			("int4",))

	def testArrayAcceptance(self):
		base.sqltypeToPgValidator("real[]")("_float8")

	def testNondbRejecting(self):
		self.assertRaisesWithMsg(TypeError,
			"Column with a non-db type file mapped to db type wurst",
			base.sqltypeToPgValidator("file"),
			("wurst",))

	def testPgSphereOk(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="test"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		table = api.TableForDef(rd.tables[0], connection=self.conn, create=True)
		try:
			table.ensureOnDiskMatches()  # raises DataError if test fails
		finally:
			self.conn.rollback()
			table.drop()

	def testPgSphereRaises(self):
		rd = api.parseFromString(api.RD,
			"""<resource schema="public"><table id="valtest" onDisk="true">
				<column name="x" type="spoint"/></table></resource>""")
		rd.sourceId = "inline"
		self.conn.execute("create table valtest (x integer)")
		table = api.TableForDef(rd.tables[0], connection=self.conn)
		try:
			self.assertRaisesWithMsg(api.DataError,
				"Table public.valtest: type mismatch in column x (Incompatible"
				" type in DB: Expected spoint, found int4)",
				table.ensureOnDiskMatches,
				())
		finally:
			self.conn.rollback()
			table.drop()
	

from gavo.user import validation

class GavoTableValTest(testhelpers.VerboseTest):
	class defaultArgs:
		compareDB = False

	def _getRdWithTable(self, columns):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="__test"><table id="totest" onDisk="True">
			<meta>
				creationDate: 2012-12-12T12:12:12
				subject: testing
				description: something to ignore
			</meta>
			%s</table></resource>"""%columns)
		rd.sourceId = "testing/q"
		return rd
	
	def _getValFuncOutput(self, func, rd):
		return testhelpers.captureOutput(func, (rd, self.defaultArgs))[1]

	def _getMessagesForColumns(self, columns):
		rd = self._getRdWithTable(columns)
		return self._getValFuncOutput(validation.validateTables, rd)

	def testReservedName(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='distance'/>"),
			"[WARNING] testing/q: Column totest.distance: Name is not a regular ADQL identifier.\n")

	def testDelimitedId(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='quoted/distance'/>"),
			"")

	def testBadUCD(self):
		self.assertEqual(self._getMessagesForColumns(
			"<column name='foo' ucd='em.IR'/>"),
			"[WARNING] testing/q: Column __test.totest.foo: UCD em.IR not accepted by astropy (Secondary word 'em.IR' is not valid as a primary word).\n")


from gavo.web import examplesrender

class RSTExtensionTest(testhelpers.VerboseTest):
	def testWorkingExamples(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"](
				"Here is a :genparam:`par(example, no?)` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue(b'<span property="generic-parameter" typeof="keyval"'
			in res)
		self.assertTrue(b'<span property="key" class="genparam-key">par</span>'
			in res)
		self.assertTrue(b'<span property="value" class="genparam-value">'
			b'example, no?</span>' in res)
	
	def testFailingExample(self):
		ex = examplesrender._Example(
			base.META_CLASSES_FOR_KEYS["_example"]("Here is a :genparam:`parfoo` "
				"example for genparam.""", title="Working genparam"))
		res = ex._getTranslatedHTML()
		self.assertTrue(b"parfoo does not" in res)
		self.assertTrue(b'<span class="problematic"' in res)


def _getUpgraders():
	class To10Upgrader(upgrade.Upgrader):
		version = 9
		s_010_foo = "CREATE TABLE foo"
		u_010_fill_foo = "INSERT INTO foo"

	class To11Upgrader(upgrade.Upgrader):
		version = 10

		s_010_bar = "CREATE TABLE bar"

		u_010_fill_bar = "INSERT INTO bar"

		u_005_register_bar = "INSERT INTO metastore"
	
	class To20Upgrader(upgrade.Upgrader):
		version = 19

		s_005_addheck = "ALTER TABLE bar ADD COLUMN quux"
		s_010_addheck = "ALTER TABLE bar ADD COLUMN honk"
	
	return list(locals().values())


class UpgraderTest(testhelpers.VerboseTest):
	def testSchemachangeSort(self):
		statements = list(upgrade.iterStatements(9, 11, _getUpgraders()))
		self.assertEqual(statements[0], "CREATE TABLE foo")
		self.assertTrue(statements[2] is upgrade._COMMIT)
		self.assertEqual(statements[3], 'INSERT INTO foo')
		updateSchemaversion = statements[4]
		self.assertTrue("To10Upgrader" in repr(updateSchemaversion.__self__))
		self.assertEqual(len(statements), 10)

	def testHoleCovered(self):
		statements = list(upgrade.iterStatements(10, 20, _getUpgraders()))
		self.assertEqual(statements[2], "ALTER TABLE bar ADD COLUMN honk")
		self.assertTrue(statements[3] is upgrade._COMMIT)
		self.assertTrue("To20Upgrader" in repr(statements[-2].__self__))


class TestScaffoldTest(testhelpers.VerboseTest):
	def testDataServer(self):
		with testhelpers.DataServer("success!\n") as url:
			res = utils.urlopenRemote(url).read()
		self.assertEqual(res, b"success!\n")

		# try another to make sure the first has been killed; if
		# it hasn't then this second server can't be bound to the port.
		with testhelpers.DataServer(b"second success!\n") as url:
			res = utils.urlopenRemote(url).read()
		self.assertEqual(res, b"second success!\n")


from gavo import rsc
from gavo.formats import fitstable
from gavo.user import mkrd

class FITSColGenTest(testhelpers.VerboseTest):
	def testBasic(self):
		table = api.TableForDef(api.resolveCrossId("data/test#abcd"))
		hdus = fitstable.makeFITSTable(rsc.wrapTable(table))
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsFITS(hdus[1])),
			'<column name="a" type="text"\n'
			'  ucd=""\n'
			'  description="Some weirdness"\n'
			'  verbLevel="1"/>\n'
			'<column name="b" type="integer"\n'
			'  ucd=""\n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="c" type="integer"\n'
			'  ucd=""\n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="d" type="integer"\n'
			'  unit="km" ucd=""\n'
			'  description=""\n'
			'  verbLevel="1"/>\n'
			'<column name="e" type="text"\n'
			'  ucd=""\n'
			'  utype="junk.junk.junk"\n'
			'  description=""\n'
			'  verbLevel="1"/>')


class VOTableColGenTest(testhelpers.VerboseTest):
	def testBasic(self):
		from gavo.formats import votable
		table = api.TableForDef(api.resolveCrossId("data/test#abcd"))
		inF = io.BytesIO(votable.getAsVOTable(table))
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsVOTable(inF)),
			'<column name="a" type="text"\n'
  		'  ucd=""\n'
  		'  description="Some weirdness"\n'
  		'  verbLevel="1"/>\n'
			'<column name="b" type="integer"\n'
  		'  ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="c" type="integer"\n'
  		'  ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="d" type="integer"\n'
  		'  unit="km" ucd=""\n'
  		'  description=""\n'
  		'  verbLevel="1"/>\n'
			'<column name="e" type="timestamp"\n'
  		'  ucd=""\n'
  		'  utype="junk.junk.junk"\n'
  		'  description=""\n'
  		'  verbLevel="1"/>')


_BBB_EXAMPLE = """
Some random junk

Abstract:
  We provide a catalogue of 541 stars, brown dwarfs, and exoplanets in 339 
  outreach potential.

File Summary:
--------------------------------------------------------------------------------
 FileName    Lrecl   Records       Explanations
--------------------------------------------------------------------------------
ReadMe              80        .    this file
The10pcSample.dat   930       561  the 10 pc catalogue


--------------------------------------------------------------------------------
Byte-by-byte Description of file: The10pcSample.dat
--------------------------------------------------------------------------------
 Bytes    Format Units  Label            Explanations
--------------------------------------------------------------------------------
  1-  4   I4     ---    NB_OBJ           Running number for object
 11- 36   A26    ---    SYSTEM_NAME      Name of the system
 74- 87   F14.9  ---    RA               Right ascension in ICRS
129-159   A31    ---    PARALLAX_BIBCODE Reference for the parallax
178-193   F7.2   mas/yr PMRA_ERROR       Proper motion uncertainty in right 
                                       ascension
348-349   I2     ---    G_CODE           =2 if G from Gaia DR2 ; =3 if G from 
                                       Gaia EDR3 ; =10 if G derived from
                                       spectral type ; =20 for spectral type >T6
                                       (arbitrarily set to absolute G mag = 25)
583-930   I8     ---    COMMENT          Additional comments on exoplanets,
                                       multiplicity, etc

--------------------------------------------------------------------------------
"""

class VizColGenTest(testhelpers.VerboseTest):
	def testBasic(self):
		self.assertEqual(
			mkrd.getColumnXML(mkrd.iterColAttrsViz(io.StringIO(_BBB_EXAMPLE))),
			'<column name="NB_OBJ" type="integer"\n'
			'  ucd=""\n'
			'  description="Running number for object"\n'
			'  verbLevel="1"/>\n'
			'<column name="SYSTEM_NAME" type="text"\n'
			'  ucd=""\n'
			'  description="Name of the system"\n'
			'  verbLevel="1"/>\n'
			'<column name="RA" type="double precision"\n'
			'  ucd=""\n'
			'  description="Right ascension in ICRS"\n'
			'  verbLevel="1"/>\n'
			'<column name="PARALLAX_BIBCODE" type="text"\n'
			'  ucd=""\n'
			'  description="Reference for the parallax"\n'
			'  verbLevel="1"/>\n'
			'<column name="PMRA_ERROR" type="real"\n'
			'  unit="mas/yr" ucd=""\n'
			'  description="Proper motion uncertainty in right  ascension"\n'
			'  verbLevel="1"/>\n'
			'<column name="G_CODE" type="smallint"\n'
			'  ucd=""\n'
			'  description="=2 if G from Gaia DR2 ; =3 if G from  Gaia EDR3 ; =10 if G derived from spectral type ; =20 for spectral type >T6 (arbitrarily set to absolute G mag = 25)"\n'
			'  verbLevel="1"/>\n'
			'<column name="COMMENT" type="bigint"\n'
			'  ucd=""\n'
			'  description="Additional comments on exoplanets, multiplicity, etc"\n'
			'  verbLevel="1"/>')


from gavo.base import typesystems

# I think we ought to have a test module for the whole type conversion
# thing -- I wonder why I didn't make one back when I wrote that stuff
class ScalarifyTest(testhelpers.VerboseTest,
		metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		arrtype, stype = sample
		self.assertEqual(typesystems.scalarify(arrtype), stype)
	
	samples = [
		("char", "char"),
		("spoly", "spoly"),
		("int[]", "int"),
		("int[12][]", "int"),]
	
	def testWithConversion(self):
		self.assertEqual(typesystems.sqltypeToPython(
			typesystems.scalarify("bigint[22][]"))("432"), 432)


from gavo.imp import astropyucd

class UCDTest(testhelpers.VerboseTest):
	def testBasic(self):
		self.assertEqual(
			astropyucd.parse_ucd("pos.eq.dec;meta.main"),
				[('ivoa', 'pos.eq.dec'), ('ivoa', 'meta.main')])
	
	def testColor(self):
		self.assertEqual(
			astropyucd.parse_ucd("phot.color;em.opt.B;em.opt.V"),
				[('ivoa', 'phot.color'), ('ivoa', 'em.opt.B'), ('ivoa', 'em.opt.V')])

	def testNonPrimary(self):
		self.assertEqual(
			astropyucd.check_ucd("meta.ref.uri;meta.curation", 
				check_controlled_vocabulary=True),
			False)


if __name__=="__main__":
	testhelpers.main(KVLMakeTest)
