"""
Tests for services and cores.
"""

#c Copyright 2008-2021, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import datetime
import re
import os

from twisted.internet import defer
from twisted.internet import threads
from twisted.web import template

from gavo.helpers import testhelpers

from gavo import base
from gavo import formal
from gavo import rsc
from gavo import rscdef
from gavo import rscdesc
from gavo import protocols
from gavo import svcs
from gavo import utils
from gavo.formats import votablewrite
from gavo.formal import iformal
from gavo.formal import nevowc
from gavo.protocols import scs
from gavo.svcs import inputdef
from gavo.svcs import renderers
from gavo.web import common as webcommon
from gavo.web import formrender
from gavo.web import metarender
from gavo.web import vodal
from gavo.web import vosi

import tresc


MS = base.makeStruct


@contextlib.contextmanager
def _syncVOSI():
	"""A contextmanager that makes the VOSI renderers (and, really, all
	that just user deferToThread) synchronous for while it's running.
	"""
	dtt = threads.deferToThread
	threads.deferToThread = defer.maybeDeferred
	try:
		yield
	finally:
		threads.deferToThread = dtt


@contextlib.contextmanager
def handlingHTTPS(callingObj):
	"""a context manager that temporarily slaps an isSecure method
	on callingObj.

	This is a bit of a fake of a twisted request; don't do that on actual
	requests, because you'll nuke the actual method.
	"""
	callingObj.isSecure = lambda: 1
	try:
		yield
	finally:
		delattr(callingObj, "isSecure")


def _mkd(**kwargs):
	return kwargs


class RequestTest(testhelpers.VerboseTest):
	def testBinaryItemsDontPickle(self):
		import pickle
		self.assertRaisesWithMsg(ValueError,
			"You cannot (and should not) pickle BinaryItems",
			pickle.dumps,
			([webcommon.BinaryItem(("abc", 2))],))


class ErrorMsgTest(testhelpers.VerboseTest):
	def testMissingQueriedTable(self):
		self.assertRaisesWithMsg(base.StructureError,
			'At IO:\'<resource schema="test"> <dbCore id="foo"/> </resource>\', (2, 22):'
			' You must set queriedTable on dbCore elements',
			base.parseFromString,
			(rscdesc.RD,
			"""<resource schema="test">
				<dbCore id="foo"/>
			</resource>"""))


class GetURLTest(testhelpers.VerboseTest):
	def testAbsolute(self):
		self.assertEqual(
			testhelpers.getTestRD("cores").getById("cstest"
				).getURL("form"),
			"http://localhost:8080/data/cores/cstest/form")

	def testRelative(self):
		self.assertEqual(
			testhelpers.getTestRD("cores").getById("cstest"
				).getURL("form", absolute=False),
			"/data/cores/cstest/form")

	def testWithParam(self):
		self.assertEqual(
			testhelpers.getTestRD("cores").getById("cstest"
				).getURL("form", arg1="10%", arg2="/&!;="),
			'http://localhost:8080/data/cores/cstest/form?arg1=10%25&'
			'arg2=%2F%26%21%3B%3D')

	def testHTTPS(self):
		with handlingHTTPS(self):
			service = testhelpers.getTestRD("cores").getById("cstest")
			self.assertEqual(service.getURL("form"),
				"https://localhost/data/cores/cstest/form")
			self.assertEqual(base.getMetaText(service.publications[0], "accessURL"),
				"http://localhost:8080/data/cores/cstest/form")


class PublishTest(testhelpers.VerboseTest):
	def testPublishInvalidSet(self):
		self.assertRaisesWithMsg(base.StructureError,
			 'At IO:\'<service id=\'foo\'><nullCore/><publish render="form" s...\', (2, 35):'
			" Invalid OAI set(s) unmanaged",
			base.parseFromString,
			(svcs.Service,
				"""<service id='foo'><nullCore/><publish render="form"
					sets="ivo_managed,unmanaged"/></service>"""))


class InputTDTest(testhelpers.VerboseTest):
	resources = [("adqltable", tresc.csTestTable)]

	def testIterating(self):
		it = base.parseFromString(svcs.InputTD, """<inputTable>
				<inputKey name="par1"/><inputKey name="par2"/>
			</inputTable>""")
		results = []
		for k in it:
			results.append(k)
		self.assertEqual([k.name for k in results],
			["par1", "par2"])

	def testNameResolution(self):
		res = base.parseFromString(rscdesc.RD, """<resource schema="test">
			<table id="t">
				<column name="romp" displayHint="sf=15"/>
				<column id="henk" name="bobble" ucd="meta.flag"/>
			</table>
			<dbCore id="c" queriedTable="t">
				<inputTable>
					<inputKey original="romp"/>
					<inputKey original="henk"/>
				</inputTable>
			</dbCore>
			</resource>""")
		self.assertEqual(res.getById("c").inputTable.inputKeys[0
			].displayHint["sf"], '15')
		self.assertEqual(res.getById("c").inputTable.inputKeys[1
			].ucd, "meta.flag")

	def testFromDB(self):
		rd = base.parseFromString(rscdesc.RD, """<resource schema="test">
			<service><nullCore><inputTable>
				<inputKey name="rv" id="testcase">
					<values fromdb="alpha FROM \schema.csdata"/></inputKey>
			</inputTable></nullCore></service></resource>""")
		self.assertEqual(rd.getById("testcase").values.options[0].title, "10.0")

	def testNULLValues(self):
		ik = MS(svcs.InputKey, name="foo", type="text",
			values=MS(rscdef.Values, options=
				[MS(rscdef.Option, content_="")]))
		it = MS(svcs.InputTD, inputKeys=[ik])
		self.assertEqual(svcs.CoreArgs.fromRawArgs(it, {}).args,
			{'foo': None})
		self.assertEqual(svcs.CoreArgs.fromRawArgs(it, {"foo": []}).args,
			{'foo': None})
		self.assertEqual(svcs.CoreArgs.fromRawArgs(it, {"foo": [""]}).args,
			{'foo': None})


class InputKeyParsingTestGood(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		keyDef, inputList, expected = sample
		ik = base.parseFromString(svcs.InputKey, 
			'<inputKey name="foo" %s'%keyDef)
		self.assertEqual(ik.computeCoreArgValue(inputList), expected)
	
	samples = [
		('/>', ['13.25'], 13.25),
		('/>', ['12.3', '13.25'], 13.25),
		('/>', [], None),
		('multiplicity="single"/>', [], None),
		('multiplicity="forced-single"/>', [], None),
#5
		('multiplicity="single"/>', ['22'], 22),
		('multiplicity="forced-single"/>', ['22'], 22),
		('type="text"><values><option>knuz</option></values></inputKey>',
			[],
			None),
		('type="text"><values><option>knuz</option></values></inputKey>',
			["knuz"],
			["knuz"]),
		('type="text"><values><option>a</option><option>b</option>'
			'</values></inputKey>',
			["a", "b"],
			["a", "b"]),
#10
		('type="integer[2]"/>', ['12 13'], [12, 13]),
		('type="integer[2]" multiplicity="multiple"/>', ['12 13'], [[12, 13]]),
		('type="integer[2]" multiplicity="multiple"/>', 
			['12 13', '14 15'], [[12, 13], [14, 15]]),
		('type="integer[]"/>', ["4"], [4,]),
		('type="integer[3]"/>', ["4 4 5"], [4,4,5]),
#15
		('type="integer[3]" multiplicity="multiple"/>', 
			["4 4 5", "7 8 9"],
			[[4,4,5], [7,8,9]]),
		('multiplicity="multiple"/>', [], None),
		('type="text" preparse="return input.upper()"/>', ["abc"], "ABC"),
	]


class InputKeyParsingTestBad(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		keyDef, inputList, msg = sample
		ik = base.parseFromString(svcs.InputKey, 
			'<inputKey name="foo" %s'%keyDef)
		self.assertRaisesWithMsg(
			(base.ValidationError, base.MultiplicityError),
			msg,
			ik.computeCoreArgValue,
			(inputList,))

	samples = [
		('required="True"/>', [], "Field foo: Required parameter 'foo' missing."),
		('multiplicity="forced-single"/>', ["22", "3"], 
			"Field foo: Inputs for the parameter foo must not have more than"
				" one value; hovever, ['22', '3'] was passed in."),
		('type="text"><values><option>knuz</option></values></inputKey>', 
			["22", "3"],
			"Field foo: '22' is not a valid value for foo"),
		('multiplicity="multiple" required="True"/>', [],
			"Field foo: Required parameter 'foo' missing."),
	]


class CoreArgsTest(testhelpers.VerboseTest):
	def _assertPartialDict(self, partialExpectation, result):
		"""checks that all key/value pairs in partialExpectation are present in
		result.
		"""
		for key, value in partialExpectation.items():
			self.assertEqual(value, result[key])

	def _getTableForInputKey(self, source, **keyPars):
		return svcs.CoreArgs.fromRawArgs(
				MS(svcs.InputTD,
					inputKeys=[MS(svcs.InputKey, name="x", **keyPars)]),
				source)

	def testCarryingMeta(self):
		ca = svcs.CoreArgs.fromRawArgs(
			base.parseFromString(svcs.InputTD, """<inputTable>
			<inputKey name="par1"/></inputTable>"""),
			{})
		ca.setMeta("test.grubbel", "testValue")
		self.assertEqual(
			ca.getMeta("test").getMeta("grubbel").getContent("text"), 
			"testValue")

	def testTypedIntSet(self):
		t = self._getTableForInputKey({"x": ["22", "24"]},
			type="integer")
		self._assertPartialDict({"x": 24}, t.args)

	def testTypedIntVal(self):
		t = self._getTableForInputKey({"x": "22"},
			type="integer")
		self._assertPartialDict({"x": 22}, t.getParamDict())

	def testTypedSingle(self):
		t = self._getTableForInputKey({"x": ["-22"]},
				type="real", multiplicity="single")
		self._assertPartialDict({"x": -22}, t.getParamDict())

	def testBadLiteralRejected(self):
		self.assertRaises(base.ValidationError,
			self._getTableForInputKey,
			{"x": "gogger"}, type="integer")
	
	def testMultipleRejected(self):
		self.assertRaises(base.ValidationError,
			self._getTableForInputKey,
			{"x": ["-22", "30"]},
			type="real", multiplicity="forced-single")

	def testMissingRequired(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field x: Required parameter 'x' missing.",
			self._getTableForInputKey,
			({},),
			type="real", required="True")

	def testMissingStringRequired(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field x: Required parameter 'x' missing.",
			self._getTableForInputKey,
			({"x": []},),
			type="text", required=True)

	def testExtrasRejected(self):
		it = MS(svcs.InputTD,
			inputKeys=[MS(svcs.InputKey, name="x")], exclusive=True)
		self.assertRaisesWithMsg(base.ValidationError,
			"Field (various): The following parameter(s) are not"
			" accepted by this service: y",
			svcs.CoreArgs.fromRawArgs,
			(it, {"x": [3.5], "y": [9]}))

	def testUppercaseNotRejected(self):
		it = MS(svcs.InputTD,
			inputKeys=[MS(svcs.InputKey, name="x")], exclusive=True)
		ca = svcs.CoreArgs.fromRawArgs(it, {"X": [3.5]})
		self.assertEqual(ca.args, {'x': 3.5})

	def testLowercaseNotRejected(self):
		it = MS(svcs.InputTD,
			inputKeys=[MS(svcs.InputKey, name="X")], exclusive=True)
		ca = svcs.CoreArgs.fromRawArgs(it, {"x": [3.5]})
		self.assertEqual(ca.args, {'X': 3.5})

	def testMissingFile(self):
		it = MS(svcs.InputTD,
			inputKeys=[MS(svcs.InputKey, name="X", type="file")])
		ca = svcs.CoreArgs.fromRawArgs(it, {})
		self.assertEqual(ca.args['X'], None)


class _ContextGrammarOutput(testhelpers.TestResource):
# XXX TODO: more types: unicode, spoint...
	def make(self, ignored):
		grammar = base.parseFromString(inputdef.ContextGrammar,
			"""<contextGrammar>
				<inputKey name="anint" type="integer"/>
				<inputKey name="afloat" type="real"/>
				<inputKey name="adouble" type="double precision"/>
				<inputKey name="atext" type="text"/>
				<inputKey name="adate" type="date"/>
				<inputKey name="chars" type="char" multiplicity="multiple"/>
			</contextGrammar>
			""")
		ri = grammar.parse(
			{	"anint": ["22"],
				"afloat": ["-2e-7"],
				"adouble": ["-2"],
				"atext": ["foo", "bar"],
				"adate": ["2013-05-04", "2005-02-02"],
				"chars": ["2", "a", "ä"],
			})
		list(ri)
		return ri.getParameters()


class ContextGrammarTest(testhelpers.VerboseTest):
	resources = [("resPars", _ContextGrammarOutput())]

	def testInteger(self):
		self.assertEqual(self.resPars["anint"], 22)

	def testFloat(self):
		self.assertEqual(self.resPars["afloat"], -2e-7)

	def testDouble(self):
		self.assertEqual(self.resPars["adouble"], -2)

	def testText(self):
		self.assertEqual(self.resPars["atext"], "bar")

	def testDate(self):
		self.assertEqual(self.resPars["adate"], datetime.date(2005, 2, 2))

	def testChar(self):
		self.assertEqual(self.resPars["chars"], ["2", "a", "ä"])


class InputTableGenTest(testhelpers.VerboseTest):
	def _getInputTableFor(self, svcId, inputDict):
		service = testhelpers.getTestRD("cores").getById(svcId)
		return svcs.CoreArgs.fromRawArgs(
			service.getCoreFor("form", svcs.emptyQueryMeta).inputTable, inputDict)

	def testDefaulting(self):
		ca = self._getInputTableFor("cstest", _mkd(
				hscs_pos="Aldebaran", hscs_SR="0.25"))
		self.assertEqual(ca.args, {
			'rV': '-100 .. 100', 'hscs_sr': 0.25, 'mag': None, 
			'hscs_pos': 'Aldebaran'})

	def testInvalidLiteral(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field hscs_sr: 'a23' is not a valid literal for hscs_sr",
			self._getInputTableFor,
			("cstest", _mkd(
				hscs_pos="Aldebaran", hscs_sr="a23")))

	def testEnumeratedNoDefault(self):
		ca = self._getInputTableFor("enums", {})
		self.assertEqual(ca.args, {'a': None, 'b': None,
			'c':[1]})

	def testCaseFolding(self):
		ca = self._getInputTableFor("enums", _mkd(B=2))
		self.assertEqual(ca.args, {'a': None, 'b': [2],
			'c':[1]})

	def testEnumeratedBadValues(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Field b: '3' is not a valid value for b",
			self._getInputTableFor,
			("enums", _mkd(b="3")))


class PlainSQLGenerationTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		ikAttrs, rawArgs, expectedSQL, expectedSQLArgs = sample
		ik = base.parseFromString(svcs.InputKey, '<inputKey %s>'%ikAttrs)
		inputTable = svcs.CoreArgs.fromRawArgs(
			MS(svcs.InputTD, inputKeys=[ik]), rawArgs)
		outPars = {}
		self.assertEqual(
			base.getSQLForField(ik, inputTable.args, outPars),
			expectedSQL)
		self.assertEqual(
			outPars,
			expectedSQLArgs)
	
	samples = [
		('name="foo" type="integer"/', {"foo": [2]},
			"foo=%(foo0)s", {'foo0': 2}),
		('name="foo" type="integer"/', {"foo": []},
			None, {}),
		('name="foo" type="integer" multiplicity="multiple"/', {"foo": [2]},
			"foo=%(foo0)s", {'foo0': 2}),
		('name="foo" type="integer" multiplicity="multiple"/', {"foo": [2,4,5]},
			"foo IN %(foo0)s", {'foo0': set([2,4,5])}),
		('name="foo" type="integer" multiplicity="multiple"/', {"foo": []},
			None, {}),
#5
		('name="foo" type="integer" multiplicity="multiple"/', {},
			None, {}),
		# is this behaviour we actually want?  This isn't theoretical
		# either: you can produce it by passing in null literals.
		('name="foo" type="integer"/', {"foo": [None]},
			None, {}),
		('name="foo" type="integer"><values><option/></values>'
			'</inputKey', 
			{"foo": []},
			None, {}),
		('name="foo" type="text"><values default="nix"/></inputKey',
			{},
			"foo=%(foo0)s", {'foo0': 'nix'}),
	]


class PlainDBServiceTest(testhelpers.VerboseTest):
	"""tests for working db-based services, having defaults for everything.
	"""
	resources = [("prodtbl", tresc.prodtestTable)]
	def setUp(self):
		testhelpers.VerboseTest.setUp(self)
		self.rd = testhelpers.getTestRD()

	def testEmptyQuery(self):
		svc = self.rd.getById("basicprod")
		res = testhelpers.runSvcWith(svc, "get", {})
		namesFound = set(r["object"] for r in res.getPrimaryTable().rows)
		self.assertTrue(set(["gabriel", "michael"])<=namesFound)

	def testOneParameterQuery(self):
		svc = self.rd.getById("basicprod")
		res = testhelpers.runSvcWith(svc, "form", {"accref": "~ *a.imp"})
		namesFound = set(r["object"] for r in res.getPrimaryTable().rows)
		self.assertTrue("gabriel" in namesFound)
		self.assertTrue("michael" not in namesFound)

	def testTwoParametersQuery(self):
		svc = self.rd.getById("basicprod")
		res = testhelpers.runSvcWith(svc, "form", 
			{"accref": "~ *a.imp", "embargo": "< 2000-03-03"})
		namesFound = set(r["object"] for r in res.getPrimaryTable().rows)
		self.assertTrue("gabriel" not in namesFound)
		self.assertTrue("michael" not in namesFound)


class _DatafieldsTestMixin(object):
	def assertDatafields(self, columns, names):
		self.assertEqual(len(columns), len(names), "Wrong number of columns"
			" returned, expected %d, got %s"%(len(names), len(columns)))
		for c, n in zip(columns, names):
			self.assertEqual(c.name, n, "Got column %s instead of %s"%(c.name, n))


class VerblevelBasicTest(testhelpers.VerboseTest, _DatafieldsTestMixin, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		inData, expected = sample
		svc = testhelpers.getTestRD("cores.rd").getById("basiccat")
		resTable = testhelpers.runSvcWith(svc, "form", inData
			).getPrimaryTable()
		self.assertDatafields(resTable.tableDef.columns, expected)

	samples = [
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "verbosity": "2", "_FORMAT": "VOTable"},
			["a"]),
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "VERB": "1", "_FORMAT": "VOTable"},
			["a", "b"]),
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "_VERB": "2", "_FORMAT": "VOTable"},
			["a", "b", "c", "d"]),
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "_VERB": "3", "_FORMAT": "VOTable"},
				["a", "b", "c", "d", "e"]),
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "_VERB": "HTML", "_FORMAT": "VOTable"},
			["a"]),
		({"a": "xy", "b": "3", "c": "4", "d": "5",
			"e": "2005-10-12T12:23:01", "_FORMAT": "VOTable"},
			["a", "b", "c", "d"])]


class ComputedServiceTest(testhelpers.VerboseTest, _DatafieldsTestMixin):
	"""tests a simple service with a computed core.
	"""
	def setUp(self):
		self.rd = testhelpers.getTestRD("cores.rd")

	def testStraightthrough(self):
		svc = self.rd.getById("basiccat")
		res = testhelpers.runSvcWith(svc, "form", _mkd(a="xy", b="3", c="4", d="5",
				e="2005-10-12T12:23:01"))
		self.assertEqual(res.getPrimaryTable().rows, [
			{'a': 'xy', 'c': 4, 'b': 3, 
				'e': datetime.datetime(2005, 10, 12, 12, 23, 1), 'd': 5}])

	def testMappedOutput(self):
		svc = self.rd.getById("convcat")
		res = testhelpers.runSvcWith(svc, "form", _mkd(a="xy", b="3", c="4", d="5",
				e="2005-10-12T12:23:01"))
		self.assertDatafields(res.getPrimaryTable().tableDef.columns, 
			["a", "b", "d"])
		self.assertEqual(res.getPrimaryTable().
			tableDef.columns[0].verbLevel, 15)
		self.assertEqual(res.getPrimaryTable().rows[0]['d'], 5000.)

	def testAdditionalFields(self):
		svc = self.rd.getById("convcat")
		res = testhelpers.runSvcWith(svc, "form", 
				_mkd(a=["xy"], b="3", c="4", d="5", e="2005-10-12T12:23:01", 
					_ADDITEM=["c", "e"]))
		self.assertDatafields(res.getPrimaryTable().tableDef.columns, 
			["a", "b", "d", "c", "e"])

	def testTableSet(self):
		svc = self.rd.getById("basiccat")
		res = svc.getTableSet()
		self.assertEqual(len(res), 1)
		self.assertEqual(res[0].columns[0].name, "a")


class BrowsableTest(testhelpers.VerboseTest):
	"""tests for selection of URLs for browser users.
	"""
	def testBrowseableMethod(self):
		service = testhelpers.getTestRD("pubtest.rd").getById("moribund")
		self.assertTrue(service.isBrowseableWith("form"))
		self.assertTrue(service.isBrowseableWith("external"))
		self.assertFalse(service.isBrowseableWith("static"))
		self.assertFalse(service.isBrowseableWith("scs.xml"))
		self.assertFalse(service.isBrowseableWith("pubreg.xml"))
		self.assertFalse(service.isBrowseableWith("bizarro"))
	
	def testStaticWithIndex(self):
		service = testhelpers.getTestRD().getById("basicprod")
		# service has an indexFile property
		self.assertTrue(service.isBrowseableWith("static"))

	def testURLSelection(self):
		service = testhelpers.getTestRD("pubtest.rd").getById("moribund")
		self.assertEqual(service.getBrowserURL(), 
			"http://localhost:8080/data/pubtest/moribund/form")

	def testRelativeURLSelection(self):
		service = testhelpers.getTestRD("pubtest.rd").getById("moribund")
		self.assertEqual(service.getBrowserURL(fq=False), 
			"/data/pubtest/moribund/form")


def _flattenTag(tag):
	class El(template.Element):
		loader = template.TagLoader(tag)
	return nevowc.flattenSync(El())


class InputKeyTest(testhelpers.VerboseTest):
	# tests for type/widget inference with input keys.
	def _getKeyProps(self, src):
		cd = base.parseFromString(svcs.CondDesc, src
			).adaptForRenderer(svcs.getRenderer("form"))
		ik = cd.inputKeys[0]
		ftype = formrender._getFormalType(ik)
		fwid = formrender._getWidgetFactory(ik)
		request = testhelpers.FakeRequest("")
		tag = fwid(ftype).render(request, "foo", {}, None)
		rendered = _flattenTag(tag.fillSlots(formName="test"))
		tree = testhelpers.getXMLTree(
			b"<result>"+rendered+b"</result>",
			debug=False)
		return ftype, fwid, tree
	
	def _getAdapted(self, rendName, **keyProps):
		it = MS(svcs.InputTD, inputKeys=[
			MS(svcs.InputKey, name="foo", **keyProps)])
		it.inputKeys[0].setProperty("adaptToRenderer", "True")
		return it.adaptForRenderer(svcs.getRenderer(rendName)).inputKeys[0]

	def _runWithData(self, fwid, ftype, data):
		req = testhelpers.FakeRequest("")
		req.remember({}, iformal.IFormData)
		widget = fwid(ftype)
		return widget.processInput(req, "x", {}, widget.default)

	def testAllAuto(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey name="foo" type="text"/></condDesc>')
		self.assertTrue(isinstance(ftype, formal.String))
		self.assertEqual(ftype.required, False)
		self.assertEqual(rendered.xpath("input/@type"), ["text"])

	def testRequiredCondDesc(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc required="True"><inputKey name="foo" type="text"/></condDesc>')
		self.assertEqual(ftype.required, True)

	def testUnicodeCondDesc(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey name="foo" type="unicode"/></condDesc>')
		self.assertEqual(ftype.__class__.__name__, "String")

	def testNotRequiredCondDesc(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey name="foo" type="text" required="True"/></condDesc>')
		self.assertEqual(ftype.required, False)

	def testBuildFrom(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc buildFrom="data/testdata#data.afloat"/>')
		self.assertTrue(isinstance(ftype, formal.types.String))
		self.assertEqual(
			rendered.xpath("span/span[@class='fieldlegend']/a")[0].text,
			"[?num. expr.]")

	def testBuildFromEnumerated(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc buildFrom="data/testdata#data.atext"/>')
		self.assertEqual(rendered.xpath("span/input/@value"),
			["bla", "blubb"])
	
	def testFromDB(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey name="foo" type="integer">'
			'<values fromdb="generate_series(1,7)"/></inputKey></condDesc>')
		self.assertEqual(set(rendered.xpath("span/select/option/@value")),
			set("1234567"))

	def testWithOriginalAndFT(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey original="data/testdata#data.afloat"'
				' type="integer"/></condDesc>')
		self.assertTrue(isinstance(ftype, formal.types.Integer))

	def testWithEnumeratedOriginal(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey original="data/testdata#nork.cho"/></condDesc>')
		self.assertTrue(isinstance(ftype, formal.types.String))
		self.assertEqual(rendered.xpath("//input/@value"), ["a", "b"])
		self.assertEqual(rendered.xpath("//input/@type")[0], "checkbox")

	def testWithEnumeratedDefaultedRequired(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey name="m" type="text" required="True"'
			' multiplicity="forced-single"><values default="i">'
			' <option>i</option><option>u</option></values></inputKey></condDesc>')
		self.assertTrue(isinstance(ftype, formal.types.String))
		opts = rendered.xpath("div/input[@type='radio']/@value")
		self.assertEqual(opts, ['i', 'u'])
	
	def testManualWF(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc><inputKey type="text" name="x" widgetFactory="'
				'widgetFactory(ScalingTextArea, rows=\'15\')"/></condDesc>')
		self.assertEqual(rendered.xpath("textarea/@rows"), ["15"])

	def testIntervalRendered(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<FEED source="//siap2#TIMEpar"/>')
		self.assertEqual(
			rendered.xpath("div/input/@name"), 
			["foolower", "fooupper"])
		self.assertEqual(
			rendered.xpath("div/input[@type='text']")[0].tail,
			" \u2013 ")
	
	def testDateIntervalProcessed(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<FEED source="//siap2#TIMEpar"/>')
		wid = fwid(ftype)
		res = wid.processInput(testhelpers.FakeRequest(""), 
			# TODO: We should probably use vizier date expressions here
			"TIME", _mkd(TIMElower=[b"54124"], TIMEupper=[b"54125"]))
		self.assertEqual(res, [54124., 54125.])

	def testSpointWidget(self):
		cd = base.parseFromString(svcs.CondDesc, '<condDesc buildFrom='
			'"data/ssatest#hcdtest.ssa_location"/>'
			).adaptForRenderer(svcs.getRenderer("form"))
		outPars = {}
		self.assertEqual(
			cd.asSQL({"pos_ssa_location": "23,-43", "sr_ssa_location": "3"},
				outPars, svcs.emptyQueryMeta),
			"ssa_location <@ scircle(%(pos0)s, %(sr0)s)")
		self.assertAlmostEqual(outPars["sr0"], 3/60.*utils.DEG)
		self.assertEqual(outPars["pos0"].asSTCS("Junk"),
			'Position Junk 23. -43.')

	def testPlaceholder(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc buildFrom="data/test#valSpec.a_num"/>')
		self.assertEqual(rendered.xpath("span/input/@placeholder"),
			["10.0 .. 15.0"])
	
	def testPlaceholderUnitconv(self):
		ftype, fwid, rendered = self._getKeyProps(
			'<condDesc buildFrom="data/test#adql.rV"/>')
		self.assertEqual(rendered.xpath("span/input/@placeholder"),
			["-20.0 .. 200.0"])
	
	def testTimestamp(self):
		ik = self._getAdapted("form", type="timestamp")
		self.assertEqual(ik.type, "vexpr-date")


class InputFieldSelectionTest(testhelpers.VerboseTest):
	# Tests for renderer-dependent selection and adaptation of db core 
	# input fields.
	def testForm(self):
		service = testhelpers.getTestRD("cores").getById("cstest")
		self.assertEqual(
			[(k.name, k.type) for k in service.getInputKeysFor("form")],
			[("hscs_pos", "text"), ("hscs_sr", "real"), ("mag", "vexpr-float"),
				('rV', 'vexpr-float')])

	def testSCS(self):
		service = testhelpers.getTestRD("cores").getById("cstest")
		self.assertEqual(
			[(k.name, k.type) for k in service.getInputKeysFor("scs.xml")],
			 [('RA', 'double precision'), ('DEC', 'double precision'), 
			 	('SR', 'real'), 
			 	('mag', 'real[2]'), ('rV', 'vexpr-float'),
				('responseformat', 'text'), ('maxrec', 'integer'),
				('verb', 'integer')])

	def testSSAPPrune(self):
		svc = base.caches.getRD("data/ssatest").getById("d")
		self.assertFalse("FORMAT" in
			[k.name for k in svc.getInputKeysFor("ssap.xml")])

	def testAPIKeysVanish(self):
		svc = base.parseFromString(svcs.Service, """<service id="x"><nullCore/>
			<FEED source="//pql#DALIPars"/></service>""")
		self.assertEqual([],
			[k.name for k in svc.getInputKeysFor("form")])
		self.assertEqual(["responseformat", "maxrec", "verb"],
			[k.name for k in svc.getInputKeysFor("api")])


class _DALIParameters(testhelpers.TestResource):
	def make(self, deps):
		svc = base.parseFromString(svcs.Service, """<service id="x">
			<dbCore queriedTable="data/test#typesTable">
				<condDesc buildFrom="anint"/>
				<condDesc buildFrom="afloat"/>
				<condDesc buildFrom="adouble"/>
				<condDesc buildFrom="atext"/>
				<condDesc>
					<inputKey name="enum" type="smallint" multiplicity="single">
						<values>
							<option>1</option>
							<option>2</option>
						</values>
					</inputKey>
				</condDesc>
				<condDesc>
					<inputKey name="single"/>
				</condDesc>
				<condDesc buildFrom="data/test#abcd.e"/>
				<condDesc>
					<inputKey name="arr">
						<property key="adaptToRenderer">True</property>
					</inputKey>
				</condDesc>
			</dbCore>
			<FEED source="//pql#DALIPars"/></service>""")
		keys = dict((k.name, k) for k in svc.getInputKeysFor("api"))
		return keys


_daliParameters = _DALIParameters()

class DALIAdaptationTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [('pars', _daliParameters)]

	def _runTest(self, sample):
		parName, attrName, expectedValue = sample
		self.assertEqual(getattr(self.pars[parName], attrName), expectedValue)
	
	samples = [
		('enum', 'xtype', None),
		('enum', 'type', 'smallint'),
		('enum', 'multiplicity', 'single'),
		('maxrec', 'type', 'integer'),
		('maxrec', 'xtype', None),
# 5
		('anint', 'xtype', 'interval'),
		('anint', 'type', 'integer[2]'),
		('afloat', 'xtype', 'interval'),
		('afloat', 'type', 'real[2]'),
		('single', 'type', 'real'),
# 10
		('arr', 'type', 'real[2]'),
		('arr', 'xtype', 'interval'),
		('e', 'type', 'double precision[2]'),
		('e', 'unit', 'd'),
		('e', 'xtype', 'interval'),
# 15
		('atext', 'type', 'unicode'),
	]


import unittest

class DALIMetadataTest(testhelpers.VerboseTest):
	resources = [('pars', _daliParameters)]

	def testRangeCopied(self):
		par = self.pars["afloat"]
		self.assertEqual(par.values.min, "-1000.0")
		self.assertEqual(par.values.max, "2002.0")

	def testMinTypedWorks(self):
		par = self.pars["afloat"]
		self.assertEqual(par.values.min_typed, -1000.0)


class GroupingTest(testhelpers.VerboseTest):
	def _renderForm(self, svc):
		req = testhelpers.FakeRequest("")

		rend = formrender.FormRenderer(req, svc)
		form = rend.form_genForm(req)
		form.name = "foo"
		req = testhelpers.FakeRequest("")
		return nevowc.flattenSync(
			rend._getDoc(req), req)

	def testExplicitGroup(self):
		rendered = self._renderForm(
			testhelpers.getTestRD("cores").getById("grouptest"))
		#testhelpers.printFormattedXML(rendered)
		self.assertTrue(b'<fieldset id="genForm-magic" class="group localstuff"' 
			in rendered)
		self.assertTrue(b'<legend>Magic</legend>' in rendered)
		self.assertTrue(b'<div class="description">Some magic parameters we took'
			in rendered)

	def testImplicitGroup(self):
		# automatic grouping of condDescs with group
		rendered = self._renderForm(
			testhelpers.getTestRD("cores").getById("impgrouptest"))
		self.assertTrue(b'<div class="multiinputs" id="multigroup-phys">' 
			in rendered)
		self.assertTrue(b'<label for="multigroup-phys">Wonz' in rendered)
		renderedInput = re.search(b'<input[^>]*name="rV"[^>]*>', rendered).group(0)
		self.assertTrue(b'class="inmulti"' in renderedInput)


class OutputTableTest(testhelpers.VerboseTest):
	resources = [("adqltable", tresc.csTestTable)]

	def testResolution(self):
		base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo">
			<column name="bar"/></table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable autoCols="bar"/>
			</service></resource>""")

	def testNamePath(self):
		base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo"><column name="bar"/></table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable><outputField original="bar"/></outputTable>
			</service></resource>""")

	def testVerbLevel(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo">
			<column name="bar" verbLevel="7"/>
			<column name="baz" verbLevel="8"/></table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable verbLevel="7"/>
			</service></resource>""")
		cols = rd.services[0].outputTable.columns
		self.assertEqual(len(cols), 1)
		self.assertEqual(cols[0].name, "bar")

	def testParams(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo">
			<param name="bar" verbLevel="7"/>
			<param name="baz" verbLevel="8"/>
			<param name="quux" verbLevel="9"/>
			</table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable verbLevel="7" autoCols="quux"/>
			</service></resource>""")
		pars = rd.services[0].outputTable.params
		self.assertEqual(len(pars), 2)
		self.assertEqual(pars[0].name, "quux")
		self.assertEqual(pars[1].name, "bar")

	def testSTCPreserved(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo">
			<stc>Position ECLIPTIC Epoch J2200.0 "bar" "bar"</stc>
			<column name="bar"/></table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable autoCols="bar"/>
			</service></resource>""")
		self.assertEqual("ECLIPTIC",
			rd.services[0].outputTable.columns[0].stc.place.frame.refFrame)

	def testSTCPreservedOriginal(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo">
			<stc>Position ECLIPTIC Epoch J2200.0 "bar" "bar"</stc>
			<column name="bar"/></table>
			<service id="quux"><dbCore queriedTable="foo"/>
			<outputTable><outputField original="bar"/></outputTable>
			</service></resource>""")
		self.assertEqual("ECLIPTIC",
			rd.services[0].outputTable.columns[0].stc.place.frame.refFrame)

	def testAutoColsSingleWildcard(self):
		svc = base.parseFromString(svcs.Service,
			"""<service id="quux" core="data/cores#cscore">
				<outputTable autoCols="*"/>
			</service>""")
		cols = svc.getCurOutputFields(svcs.QueryMeta({"VERB": 30}))
		self.assertEqual([c.name for c in cols],
			["alpha", "delta", "mag", "rV", "tinyflag"])

	def testAutoColsMultiWildcard(self):
		svc = base.parseFromString(svcs.Service,
			"""<service id="quux" core="data/cores#cscore">
				<outputTable autoCols="*a,*g"/>
			</service>""")
		cols = svc.getCurOutputFields(svcs.QueryMeta({"VERB": 30}))
		self.assertEqual([c.name for c in cols],
			["alpha", "delta", "mag", "tinyflag"])

	def testCustomTableSelection(self):
		svc = base.parseFromString(svcs.Service,
			"""<service id="quux" core="data/cores#cscore">
				<outputTable verbLevel="15">
					<outputField original="mag" unit="mmag" select="mag*1000"/>
				</outputTable>
			</service>""")
		res = testhelpers.runSvcWith(svc, "form", _mkd(verbosity="25")
			).getPrimaryTable().tableDef
		self.assertEqual(len(res.columns), 3)
		self.assertEqual(res.getColumnByName("mag").unit, "mmag")


_RD_WITH_DL = """<resource schema="test">
	<table id="wdl">
		<meta name="_associatedDatalinkService">
			<meta name="serviceId">grok</meta>
			<meta name="idColumn">id</meta>
		</meta>
		%s
	</table>
	<service id="grok" allowed="api,dlmeta">
		<pythonCore>
			<outputTable original="wdl"/>
			<coreProc>
				<code>
					return rsc.TableForDef(service.rd.getById("wdl"), 
						rows=[{"id": "a", "nid": 4}])
				</code>
			</coreProc>
		</pythonCore>
	</service>
	%s
</resource>"""

class DatalinkInOutputTest(testhelpers.VerboseTest):
	def _getVOTTree(self, rdLiteral):
		rd = base.parseFromString(rscdesc.RD, rdLiteral)
		rd.sourceId = "internally/built"
		resTable = testhelpers.runSvcWith(
			rd.getById("grok"),
			"api", 
			{})
		votLiteral = votablewrite.getAsVOTable(resTable)
		return testhelpers.getXMLTree(votLiteral)

	def testDatalinkResourcePresent(self):
		tree = self._getVOTTree(_RD_WITH_DL%(
			'<column name="id" type="text"/><column name="nid"/>', ""))
		res = tree.xpath(
			"//RESOURCE[@type='meta' and @utype='adhoc:service']")[0]
		self.assertEqual(
			res.xpath("PARAM[@name='accessURL']")[0].get("value"),
			"http://localhost:8080/internally/built/grok/dlmeta")
		destId = res.xpath(
				"GROUP[@name='inputParams']/PARAM[@name='ID']")[0].get("ref")
		self.assertEqual(
			tree.xpath("//FIELD[@ID='%s']"%destId)[0].get("name"), "id")

	def testDatalinkResourceVanishes(self):
		tree = self._getVOTTree(_RD_WITH_DL%(
			'<column name="nid"/>', ""))
		self.assertEqual(tree.xpath(
			"//RESOURCE[@type='meta' and @utype='adhoc:service']"), [])

	def testTwoDatalinkResources(self):
		tree = self._getVOTTree(_RD_WITH_DL%(
			'<meta>_associatedDatalinkService:\n'
			'_associatedDatalinkService.serviceId:dl\n'
			'_associatedDatalinkService.idColumn:id</meta>'
			'<column name="id" type="text"/><column name="nid"/>',
			'<service id="dl" allowed="dlmeta"><datalinkCore/></service>'))
		dlRes = tree.xpath(
			"//RESOURCE[@type='meta' and @utype='adhoc:service']")
		self.assertEqual(len(dlRes), 2)
		self.assertEqual(
			dlRes[1].xpath("PARAM[@name='accessURL']")[0].get("value"),
			"http://localhost:8080/internally/built/dl/dlmeta")


class ColumnSelectionTest(
		testhelpers.VerboseTest, 
		metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		svcName, args, colNames = sample
		queryMeta = svcs.QueryMeta.fromRequestArgs(args)
		svc = testhelpers.getTestRD("cores").getById(svcName)
		found = set([c.name for c in svc.getCurOutputFields(queryMeta)])
		self.assertEqual(set(colNames), found)
	
	samples = [
		("coltest", {}, "abc"),
		("coltest0", {}, "abc"),
		("coltest", {"VERB": 3}, "abcd"),
		("coltest0", {"VERB": 3}, "abcde"),
		("coltest", {"VERB": 3, "RESPONSEFORMAT": "votable"}, "abde"),
# test05
		("coltestr", {"VERB": 3, "RESPONSEFORMAT": "votable"}, "abd"),
		("coltest", {"VERB": 1, "_ADDITEM": ["d", "f"]}, "abdf"),
		("coltest", {"VERB": 3, "_SET": "kl"}, "acd"),
		# output sets trigger respectOutputTable
		("coltest", {"VERB": 3, "_SET": "kl", "RESPONSEFORMAT": "votable"}, "ad"),
		# output sets honored with respectsOutputTable (but noxml still works)
		("coltestr", {"VERB": 3, "_SET": "kl", "RESPONSEFORMAT": "votable"}, "ad"),
# test10
		("coltest", {"VERB": 3, "_SET": "gr"}, "abd"),
		# additem is ignored in normal tabular requests...
		("coltest", {"VERB": 1, "_ADDITEM": ["d", "f"], "RESPONSEFORMAT": "votable"}, "ab"),
		# ...but not with VERB=HTML
		("coltest", {"VERB": "HTML", "_ADDITEM": ["d", "f"], 
			"RESPONSEFORMAT": "votable"}, "abdf"),
		("coltest", {"verbosity": 19}, "ab"),
		("coltest", {"VERB": 2, "_SET": "gr"}, "ab"),
	]


class TableSetTest(testhelpers.VerboseTest):
	def testFromCore(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test"><table id="foo"><column name="bar"/>
			</table>
			<service id="quux"><dbCore queriedTable="foo"/></service></resource>""")
		ts = rd.getById("quux").getTableSet()
		self.assertEqual(len(ts), 1)
		cols = ts[0].columns
		self.assertEqual(len(cols), 1)
		self.assertEqual(cols[0].name, "bar")

	def testOutputTable(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test">
			<service id="quux"><nullCore/><outputTable><outputField
			name="knotz"/></outputTable></service></resource>""")
		ts = rd.getById("quux").getTableSet()
		self.assertEqual(len(ts), 1)
		cols = ts[0].columns
		self.assertEqual(len(cols), 1)
		self.assertEqual(cols[0].name, "knotz")

	def testTAPTableset(self):
		rd = base.parseFromString(rscdesc.RD,
			"""<resource schema="test">
			<service id="quux" allowed="dali"><tapCore/></service></resource>""")
		ts = rd.getById("quux").getTableSet()
		self.assertTrue("tap_schema.tables" in [t.getQName() for t in ts])


class PythonCoreTest(testhelpers.VerboseTest):
	def testBasic(self):
		svc = base.resolveCrossId("data/cores#pc")
		res = testhelpers.runSvcWith(svc, "form", 
				_mkd(opre="1", opim=1, powers=["2 3 4"])
			).getPrimaryTable()
		self.assertEqual(res.rows[0]["re"], 0)
		self.assertEqual(res.rows[1]["im"], 2.0)
		self.assertAlmostEqual(res.rows[2]["log_value"], 1.3862943611198906)
		self.assertEqual(len(res.rows), 3)

	def testDefaulting(self):
		svc = base.resolveCrossId("data/cores#pc")
		res = testhelpers.runSvcWith(svc, "form", _mkd(opre="1", opim=1)
			).getPrimaryTable()
		self.assertEqual(len(res), 3)

	def testMissing(self):
		svc = base.resolveCrossId("data/cores#pc")
		self.assertRaisesWithMsg(base.ValidationError,
			"Field opre: Required parameter 'opre' missing.",
			testhelpers.runSvcWith,
			(svc, "form", _mkd(opim=1, powers=[2,3,4])))


class CustomCoreTest(testhelpers.VerboseTest):
	def testRunning(self):
		svc = base.resolveCrossId("data/cores#cc")
		res = testhelpers.runSvcWith(svc, "qp", _mkd(x="10"))
		self.assertEqual(res, ('text/plain', 'xxxxxxxxxx'))

	def testInputTable(self):
		core = base.resolveCrossId("data/cores#cc").core
		self.assertEqual(
			core.inputTable.inputKeys.getColumnByName("x").type, 
			"integer")

	def testInputTableWithRDDef(self):
		core = base.parseFromString(rscdesc.RD,
			"""<resource schema="data">
				<service id="u"><customCore module="testcore" id="hu">
				<inputTable>
					<inputKey name="y" type="text"/></inputTable></customCore>
			</service></resource>""").getById("hu")
		self.assertEqual(
			core.inputTable.inputKeys.getColumnByName("y").type, 
			"text")

	def testMissingModuleAttribute(self):
		self.assertRaisesWithMsg(
			base.StructureError,
			'At IO:\'<resource schema="data"> <service id="u"><customCore/...\','
			" (2, 33): Attribute module is mandatory",
			base.parseFromString,
			(rscdesc.RD,
			"""<resource schema="data">
				<service id="u"><customCore/></service></resource>"""))
	
	def testMissingModuleFile(self):
		self.assertRaisesWithMsg(
			base.StructureError,
			utils.EqualingRE('At IO:\'<resource schema="data"> <service id="u"><customCore ...\', \(2, 48\): Cannot load custom core .*/_gavo_test/inputs/data/knenk: \[Errno 2\] No such file or directory: \'.*/_gavo_test/inputs/data/knenk.py\''),
			base.parseFromString,
			(rscdesc.RD,
			"""<resource schema="data">
				<service id="u"><customCore module="knenk"/></service></resource>"""))

	def testBadModuleFile(self):
		with testhelpers.testFile(
				"borken.py", 
				"return\n", 
				inDir=base.getConfig("inputsDir")+"/data"):
			self.assertRaisesWithMsg(
				base.StructureError,
				utils.EqualingRE('At IO:\'<resource schema="data"> <service id="u"><customCore ...\', \(2, 50\): Cannot load custom core .*/_gavo_test/inputs/data/borken: \'return\' outside function \(borken.py, line 1\)'),
				base.parseFromString,
				(rscdesc.RD,
				"""<resource schema="data">
					<service id="u"><customCore module="borken"/>
					</service></resource>"""))


class HumanCoordParseTest(testhelpers.VerboseTest, 
		metaclass=testhelpers.SamplesBasedAutoTest):
	resources = [("fs", tresc.fakedSimbad)]

	def _runTest(self, sample):
		literal, parsed = sample
		self.assertEqual(scs.parseHumanSpoint(literal), parsed)
	
	samples = [
		("23.5, -21.75", (23.5, -21.75)),
		("23.5 -21.75", (23.5, -21.75)),
		("23 30, 11 15 30.6", (352.5, 11.2585)),
		("23:30:45, 11:15:30.6", (352.6875, 11.2585)),
		("Aldebaran",  (68.9375, 16.46875)),]

	def testException(self):
		self.assertRaisesWithMsg(base.ValidationError,
			"Unidentified Field: $&& zefixx is neither a RA,DEC pair nor"
			" a simbad resolvable object.",
			scs.parseHumanSpoint,
			("$&& zefixx",))


class VOSITablesTest(testhelpers.VerboseTest, testhelpers.XSDTestMixin):
	resources = [("taptable", tresc.tapPublishedADQLTable),
		('oc', tresc.obscoreTable),
		('conn', tresc.dbConnection)]

	def _getResponse(self, serviceId=None, path="", **args):
		baseService = base.resolveCrossId(serviceId or "//tap#run")
		req = testhelpers.FakeRequest(path, args=args)
		rend = vosi.VOSITablesetRenderer(req, baseService)

		with _syncVOSI():
			rawVOT = req.processWithRoot(rend)

		return rawVOT, testhelpers.getXMLTree(rawVOT, debug=False)
	
	def testNoGrandchildren(self):
		self.assertRaisesWithMsg(svcs.UnknownURI,
			"VOSI tables resources have no children",
			self._getResponse,
			(None, "ivoa.obscore/long"))

	def testNoUnrelatedTables(self):
		self.assertRaisesWithMsg(svcs.UnknownURI,
			"No table ivoa.obscore on this service.",
			self._getResponse,
			("data/test#basicprod", "ivoa.obscore"))

	def testTableAccess(self):
		doc, tree = self._getResponse(None, "ivoa.obscore")
		self.assertValidates(doc)
		self.assertEqual(tree.xpath("name")[0].text, "ivoa.obscore")
		self.assertEqual(len(tree.xpath("name")), 1)
		self.assertEqual(
			tree.xpath("//column[name='dataproduct_type']/ucd")[0].text,
			"meta.code.class")

	def testNormalService(self):
		doc, tree = self._getResponse("data/test#basicprod")
		self.assertEqual(len(tree.xpath("schema/name")), 1)
		self.assertEqual(tree.xpath("schema/name")[0].text, "test")
		self.assertEqual(len(tree.xpath("schema/table/name")), 1)
		self.assertEqual(tree.xpath("schema/table/name")[0].text, "test.prodtest")
		self.assertEqual(
			tree.xpath("//column[name='delta']/ucd")[0].text,
			"pos.eq.dec")
		datatype = tree.xpath("//column[name='delta']/dataType")[0]
		self.assertEqual(datatype.get("arraysize"), None)
		self.assertEqual(
			datatype.get("{http://www.w3.org/2001/XMLSchema-instance}type"), 
			"vs:VOTableType")

	def testMinimalDetail(self):
		doc, tree = self._getResponse("data/test#basicprod", detail=["min"])
		self.assertEqual(len(tree.xpath("schema/name")), 1)
		self.assertEqual(tree.xpath("schema/name")[0].text, "test")
		self.assertEqual(len(tree.xpath("schema/table/name")), 1)
		self.assertEqual(tree.xpath("schema/table/name")[0].text, "test.prodtest")
		self.assertEqual(len(tree.xpath("//column")), 0)

	def testNoHiddenColumns(self):
		doc, tree = self._getResponse(path="test.csdata")
		self.assertEqual(tree.xpath("column[name='rv']/description")[0].text,
			"A sample radial velocity")
		self.assertFalse(b"not be publicly visible" in doc)


class _VOSICapResponse(testhelpers.TestResource):
	def make(self, deps):
		with _syncVOSI():
			req = testhelpers.FakeRequest("")
			rend = vosi.VOSICapabilityRenderer(req,
				base.resolveCrossId("data/cores#dl"))
			rawVOT = req.processWithRoot(rend)
			return rawVOT, testhelpers.getXMLTree(rawVOT, debug=False)
		

class VOSICapabilitiesTest(testhelpers.VerboseTest, testhelpers.XSDTestMixin):
	resources = [("respAndTree", _VOSICapResponse())]

	def testValid(self):
		self.assertValidates(self.respAndTree[0])

	def testVOSIDeclared(self):
		self.assertEqual(self.respAndTree[1].xpath(
			"//capability[@standardID='ivo://ivoa.net/std/VOSI#tables']/interface"
			"/accessURL")[0].text,
			"http://localhost:8080/data/cores/dl/tableMetadata")

	def testExamplesDeclared(self):
		exInt = self.respAndTree[1].xpath(
			"//capability[@standardID='ivo://ivoa.net/std/DALI#examples']"
			"/interface")[0]

		self.assertEqual(
			exInt.get("{http://www.w3.org/2001/XMLSchema-instance}type"), 
			"vr:WebBrowser")

	def testOnlyOneResponseformat(self):
		self.assertEqual(len(self.respAndTree[1].xpath(
			"//param[name='responseformat']")), 1)

	def testIDParamDeclared(self):
		self.assertEqual(
			self.respAndTree[1].xpath("//param[name='ID']")[0].get("std"),
			"true")


class _MetaElement(nevowc.NevowcElement):
	def __init__(self, stan):
		self.loader = template.TagLoader(stan)

	@template.renderer
	def ifadmin(self, request, tag):
		return tag

	@template.renderer
	def meta(self, request, tag):
		return tag


class RendExplainerTest(testhelpers.VerboseTest, 
		metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, item):
		rendName, fragments = item
		stan = metarender.RendExplainer.explain(rendName, 
			base.resolveCrossId("//services#root"))

		rendered = formal.flattenSync(
			_MetaElement(stan)).decode("utf-8")
		for frag in fragments:
			self.assertTrue(frag in rendered, 
				"'%s' missing in '%s'"%(frag, rendered))

	samples = [
		("knall", ["some custom access method", "mentioned in the"]),
		("form", ["interface for web browsers", 
			'<a href="http://localhost:8080/__system__/services/root/form']),
		("fixed", ["custom page",
			'href="http://localhost:8080/__system__/services/root/fixed']),
		("volatile", ["custom page",
			'href="http://localhost:8080/__system__/services/root/volatile']),
# soap is currently out for lack of support in twisted.  Let's see
# where this goes.
#		("soap", ["slightly aged", "quokka"]),
		("custom", [" typically for interactive web applications"]),
		("static", ["probably used to access ancillary files here"]),
		("text", ["a text interface not intended for user"]),
		("siap.xml", ["SIAP clients", 
			" http://localhost:8080/__system__/services/root/siap.xml? ", 
			'=ALL">Validate<']),
		("scs.xml", ["SCS clients", 
			" http://localhost:8080/__system__/services/root/scs.xml? ", 
			'VERB=3">Validate<']),
		("ssap.xml", ["SSAP clients", 
			" http://localhost:8080/__system__/services/root/ssap.xml? ", 
			'ssap.xml%3F">Validate<']),
		("slap.xml", ["SLAP clients", 
			" http://localhost:8080/__system__/services/root/slap.xml? ", 
			'slap.xml%3F">Validate<']),
		("dali", ["a complex",
			'<code>identifier</code>',
			'http://localhost:8080/__system__/services/root/async">the service']),
		("uws.xml", ["or libraries",
			'href="http://localhost:8080/__system__/services/root/async"']),
		("pubreg.xml", ["site's publishing registry",
			'href="http://localhost:8080/__system__/services/root/pubreg.xml"']),
		("qp", ["to query the column defunct in the underlying"]),
		("api", ["low-level tools", 
			' href="http://localhost:8080/__system__/services/root/api?MAXREC=0"']),
		("coverage", ["retrieve the spatial coverage"]),
	]


def _renderWithForm(svc):
	req = testhelpers.FakeRequest("")
	rend = formrender.FormRenderer(req, svc)
	return nevowc.flattenSync(rend._getDoc(req), req)


class WebRelatedConfigTest(testhelpers.VerboseTest):
	def testOperatorCSS(self):
		with testhelpers.tempConfig(
				("web", "operatorCSS", "/static/foo.css")):
			doc = _renderWithForm(
				testhelpers.getTestRD("cores").getById("convcat"))
			self.assertTrue(
				b'<link rel="stylesheet" type="text/css" href="/static/foo.css" />'
				in doc)


def _getTableInfoFor(rdLit):
	req = testhelpers.FakeRequest("")
	rd = base.parseFromString(rscdesc.RD, rdLit)
	rend = metarender.TableInfoRenderer(req, 
		base.resolveCrossId("//services#overview"))
	rend.tableName="test.t"
	rend.table = rd.getById("t")
	rend.describingRD = rd
	res = rend.render(req)
	return req.accumulator.decode("utf-8")


class TableInfoRenderTest(testhelpers.VerboseTest):
	def testTableLongdoc(self):
		html = _getTableInfoFor("""<resource schema="test">
			<table id="t"><meta name="_longdoc">This table sucks</meta>
			<column name="foo"/>
			</table></resource>""")

		self.assertTrue('<h2 id="ti-tabledoc" class="section">Table Documentation</h2>' in html)
		self.assertTrue('<span class="plainmeta">This table sucks</span>' in html)

		self.assertFalse("Resource Documentation" in html)
	
	def testResourceLongdoc(self):
		html = _getTableInfoFor("""<resource schema="test">
			<meta name="_longdoc" format="rst">This *resource* sucks</meta>
			<table id="t">
			<column name="foo"/>
			</table></resource>""")

		self.assertTrue('<h2 id="ti-longdoc" class="section">Resource Documentation</h2>' in html)
		self.assertTrue('<p>This <em>resource</em> sucks</p>' in html)

		self.assertFalse('Table Documentation' in html)

	def testBothLongdoc(self):
		html = _getTableInfoFor("""<resource schema="test">
			<meta name="_longdoc" format="rst">This *resource* sucks</meta>
			<table id="t"><meta name="_longdoc">This table sucks</meta>
			<column name="foo"/>
			</table></resource>""")

		self.assertTrue('<h2 id="ti-longdoc" class="section">Resource Documentation</h2>' in html)
		self.assertTrue('<p>This <em>resource</em> sucks</p>' in html)
		self.assertTrue('<h2 id="ti-tabledoc" class="section">Table Documentation</h2>' in html)
		self.assertTrue('<span class="plainmeta">This table sucks</span>' in html)


if __name__=="__main__":
	testhelpers.main(ComputedServiceTest)
