# -*- coding: utf-8 -*-
"""
Tests having to do with various output formats.
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import base64
import datetime
import io
import json
import os
import re
import warnings
from xml.etree import ElementTree as etree

from gavo.helpers import testhelpers

from gavo import base
from gavo import formal
from gavo import formats
from gavo import rsc
from gavo import rscdef
from gavo import rscdesc
from gavo import svcs
from gavo import utils
from gavo import votable
from gavo import web #noflake: for registration
from gavo.base import valuemappers #noflake: for registration
from gavo.formats import csvtable #noflake: for registration
from gavo.formats import fitstable #noflake: for registration
from gavo.formats import geojson #noflake: for registration
from gavo.formats import jsontable #noflake: for registration
from gavo.formats import texttable #noflake: for registration
from gavo.formats import votablewrite #noflake: for registration
from gavo.svcs import outputdef
from gavo.utils import pgsphere
from gavo.utils import pyfits
from gavo.web import htmltable


_colDefs = {
	"klein": '<column name="klein"  type="smallint">'
		'<values nullLiteral="-1"/></column>',
	"prim": '<column name="prim" type="integer" required="true"'
		' description="Some random primary key"/><primary>prim</primary>',
	"nopt": '<column name="nopt" type="real" required="True"/>',
	"echter": '<column name="echter" type="double precision"/>',
	"datum": '<column name="datum" type="date"/>',
	"indf": '<column name="indf" type="text"/><index columns="indf"/>',
	"uu": '<column name="uu" type="unicode"/>'}


def _getFields(*args):
	return [_colDefs[a] for a in args]


class ResolutionTest(testhelpers.VerboseTest):
	def testResolutionUnknownKey(self):
		self.assertRaisesWithMsg(formats.CannotSerializeIn,
			"Cannot serialize in 'laberbrasel/td'.",
			formats.getWriterFor,
			("laberbrasel/td",))
	
	def testResolutionKey(self):
		self.assertEqual(formats.getWriterFor("tsv"),
			texttable.renderAsText)

	def testResolutionMimeWithBlanks(self):
		self.assertEqual(
			formats.getWriterFor("text/csv; header = present").__name__,
			"<lambda>")
	
	def testMimeGetting(self):
		self.assertEqual(
			formats.getMIMEFor("votableb2"),
			"application/x-votable+xml;serialization=BINARY2")

	def testIterFormats(self):
		labels = list(formats.iterFormats())
		self.assertTrue("votableb2", labels)
		self.assertTrue("tsv", labels)


class FITSWriterTest(testhelpers.VerboseTest):
	def _makeRD(self, colNames, rd=None):
		return base.parseFromString(rscdesc.RD,
			"""<resource resdir="%s" schema="test">
			<data id="randomTest">
				<dictlistGrammar/>
				<table id="foo">
					%s
				</table>
				<rowmaker id="_foo" idmaps="%s"/>
				<make table="foo" rowmaker="_foo"/>
			</data>
		</resource>
		"""%(os.path.abspath("."), "\n".join(_getFields(*colNames)),
			",".join(colNames)))

	_testData = [{"klein": 1, "prim": 2, "nopt": 4.25, "indf": "CEN A"},
				{"klein": 2, "prim": 7, "nopt": 9.32, "indf":
					"QSO 3248+33 Component Gamma"}]

	def testMakeSimpleTable(self):
		"""tests for creation of a simple FITS table.
		"""
		rd = self._makeRD(["klein", "prim", "nopt", "indf"])
		dataSet = rsc.makeData(rd.getById("randomTest"),
			forceSource=self._testData)
		hdulist = fitstable.makeFITSTable(dataSet)
		self.assertEqual(len(hdulist), 2, "Primary or extension hdu missing")
		self.assertTrue("DATE" in hdulist[0].header, "No DATE keyword"
			" in primary header")
		ft = hdulist[1].data
		self.assertEqual(ft.field("klein")[0], 1)
		self.assertEqual(ft.field("prim")[1], 7)
		self.assertEqual(ft.field("nopt")[0], 4.25)
		self.assertEqual(ft.field("indf")[1], "QSO 3248+33 Component Gamma")
		self.assertEqual(len(hdulist[1].columns), 4)
		self.assertEqual(hdulist[1].columns[3].format, "27A")
	
	def testMakeDoubleTable(self):
		"""tests for creation of a two-extension FITS table.
		"""
		rd = self._makeRD(("klein", "prim", "nopt", "indf"))
		rec2 = base.parseFromString(rscdef.TableDef,
			'<table id="part2">%s</table>'%"".join(_getFields("prim", "nopt")))
		dd = rd.getById("randomTest")
		dd.tables.append(rec2)
		dd.makes.append(base.makeStruct(rscdef.Make, table=rec2))
		dd.finishElement()
		dataSet = rsc.makeData(dd, forceSource=self._testData)
		hdulist = fitstable.makeFITSTable(dataSet)
		self.assertEqual(len(hdulist), 3, "Exporting composite data sets"
			" doesn't catch additional tables")

	def testTableWrite(self):
		rd = self._makeRD(("klein", "prim", "nopt", "indf"))
		dataSet = rsc.makeData(rd.getById("randomTest"),
			forceSource=self._testData)
		fName = fitstable.makeFITSTableFile(dataSet)
		self.assertTrue(os.path.exists(fName), "makeFITSTableFile doesn't"
			" create the file it says it creates")
		hdulist = pyfits.open(fName)
		self.assertEqual(len(hdulist), 2, "makeFITSTableFile wrote"
			" weird file")
		os.unlink(fName)

	def testSomeNullvalues(self):
		rd = self._makeRD(("klein", "echter", "indf"))
		dataSet = rsc.makeData(rd.getById("randomTest"),
			forceSource=[{"klein": None, "echter": None, "indf": None}])
		hdulist = fitstable.makeFITSTable(dataSet)
		resTup = hdulist[1].data[0]
		self.assertEqual(resTup[0], -1)
		self.assertFalse(resTup[1]==resTup[1]) # "isNan"
		self.assertEqual(resTup[2], "")

	def testUnicodishStuff(self):
		rd = self._makeRD(("indf", "uu"))
		dataSet = rsc.makeData(rd.getById("randomTest"),
			forceSource=[{"indf": "Räuber", "uu": "Räuber"},
				{"uu": "ja"*10, "indf": "ja"*40}])
		hdulist = fitstable.makeFITSTable(dataSet)
		resTup = hdulist[1].data[0]
		self.assertEqual(resTup[0], "R?uber")
		self.assertEqual(resTup[1], "R?uber")
		self.assertEqual(hdulist[1].columns[0].format, "80A")
		# the following should probably grow once we support some sort
		# of unicode in fits binary tables.
		self.assertEqual(hdulist[1].columns[1].format, "20A")


class _TestDataTable(testhelpers.TestResource):
	"""A fairly random table with mildly challenging data.
	"""
	def make(self, deps):
		dd = testhelpers.getTestRD().getById("tableMaker")
		data = rsc.makeData(dd, forceSource=[
			(1, -2, 3, "Wäre es da nicht besser,\n die Regierung setzte das Volk"
				" ab\tund wählte ein anderes?", '2004-05-05',
				pgsphere.SPoint.fromDegrees(124, -30)),
			(None, None, None, None, None, None)])
		return data

_testDataTable = _TestDataTable()


class _FormattedData(testhelpers.TestResource):
	"""_testDataTable after being pushed through formats.getFormatted.

	If textify is true, the result will be utf-8 decoded and returned
	as a string so it's nicer to write tests.
	"""
	resources = [("data", _testDataTable)]

	def __init__(self, destFormat, debug=False, textify=True):
		self.destFormat = destFormat
		self.debug = debug
		self.textify = textify
		testhelpers.TestResource.__init__(self)

	def make(self, dependencies):
		with warnings.catch_warnings():
			warnings.simplefilter("ignore")
			output = formats.getFormatted(self.destFormat, dependencies["data"])
		if self.debug:
			with open("generated.dat", "wb") as f:
				f.write(output)
		if self.textify:
			return output.decode("utf-8")
		else:
			return output


class _FormattedAndParsedData(testhelpers.TestResource):
	"""_testDataTable serialised and deserialized again through loads.

	What's coming back is (result-string, result-data).
	"""
	resources = [("data", _testDataTable)]

	def __init__(self, destFormat, loads, debug=False, textify=True):
		self.destFormat = destFormat
		self.debug = debug
		self.loads = loads
		self.textify = textify
		testhelpers.TestResource.__init__(self)

	def make(self, dependencies):
		data = _FormattedData(self.destFormat, self.debug, textify=False
			).make(dependencies)
		parsed = self.loads(data)
		if self.textify:
			data = data.decode("utf-8")
		return data, parsed


class _JSONTable(testhelpers.TestResource):
	"""A table that went through JSON serialisation.

	The resource is (json-text, decoded-json).
	"""
	resources = [("data", _testDataTable)]

	def make(self, deps):
		jsText = formats.getFormatted("json", deps["data"])
		decoded = json.loads(jsText)
		return jsText, decoded


class JSONOutputTest(testhelpers.VerboseTest):
	resources = [("tAndD", _FormattedAndParsedData('json', json.loads))]

	def testColumns(self):
		c1, c2, c3, c4, c5, c6 = self.tAndD[1]["columns"]
		self.assertEqual(c1["datatype"], "int")
		self.assertEqual(c2["name"], "afloat")
		self.assertEqual(c3["arraysize"], None)
		self.assertEqual(c4["description"], 'Just by a \xb5.')
		self.assertEqual(c5["datatype"], "double")
		self.assertEqual(c6["datatype"], "double")

	def testContains(self):
		self.assertEqual(self.tAndD[1]["contains"], "table")
	
	def testData(self):
		self.assertEqual(self.tAndD[1]["data"], [
			[1, -2.0, 3.0, 'W\xe4re es da nicht besser,\n'
				' die Regierung setzte das Volk ab\tund w\xe4hlte ein anderes?',
				2453130.5, list(pgsphere.SPoint.fromDegrees(124., -30.).asDALI())],
			[None, None, None, None, None, None]])

	def testParameterDescription(self):
		intPar = self.tAndD[1]["params"]["intPar"]
		self.assertEqual(intPar["value"], 42)
		self.assertEqual(intPar["description"], "test integer parameter")
	
	def testParameterMetadata(self):
		floatPar = self.tAndD[1]["params"]["exactFloatPar"]
		self.assertEqual(floatPar["value"], 0.25)
		self.assertEqual(floatPar["unit"], "m")
		self.assertEqual(floatPar["ucd"], "phys.width")

	def testMetaInfo(self):
		dd = testhelpers.getTestRD().getById("tableMaker")
		data = rsc.makeData(dd, forceSource=[
			(1, 2, 3, "ei", '2004-05-05')])
		table = data.getPrimaryTable()
		table.addMeta("_warning", "Warning 1")
		table.addMeta("_warning", "Warning 2")
		table.addMeta("_queryStatus", "OK")
		afterRoundtrip = json.loads(
			formats.getFormatted("json", data))
		self.assertEqual(afterRoundtrip["warnings"],
			["Warning 1", "Warning 2"])
		self.assertEqual(afterRoundtrip["queryStatus"],
			"OK")


class FormatOutputTest(testhelpers.VerboseTest):
	"""a base class for tests against formatted output.
	"""
	# extra pain to allow both _FormattedData and _FormattedAndPartData
	# in output, plus unwrapping stuff wrapped by testhelpers.
	def _getOutput(self):
		output = getattr(self.output, "original", self.output)
		if isinstance(output, tuple):
			# it's a tuple wrapped by testresources
			return output[0]
		else:
			return output

	def assertOutputHas(self, fragment):
		try:
			self.assertTrue(fragment in self._getOutput(),
				"'%s' not found in output"%utils.debytify(fragment))
		except AssertionError:
			with open("generated.dat", "wb") as f:
				f.write(utils.bytify(self._getOutput()))
			raise

	def assertOutputHasNot(self, fragment):
		self.assertFalse(fragment in self._getOutput(),
			"Forbidded string '%s' found in output"%fragment)


class TextOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("txt", debug=False))]
	
	def testNullSerialization(self):
		self.assertEqual(self.output.split('\n')[1],
			'N/A   N/A  N/A                                                                                   N/A        N/A                          N/A')
	def testValues(self):
		self.assertEqual(self.output.split("\n")[0],
			"  1  -2.0  3.0  Wäre es da nicht besser,\\n die Regierung setzte das Volk ab\\tund wählte ein anderes?  2453130.5  [124.0 -29.999999999999996]")


class TSVColumnsOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("tsv"))]
	
	def testNullSerialization(self):
		self.assertEqual(self.output.split('\n')[1],
			'N/A\tN/A\tN/A\tN/A\tN/A\tN/A')
	
	def testValues(self):
		self.assertEqual(self.output.split("\n")[0],
			"1\t-2.0\t3.0\tWäre es da nicht besser,\\n die Regierung setzte"
				" das Volk ab\\tund wählte ein anderes?\t2453130.5\t"
				"[124.0 -29.999999999999996]")


class CSVOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("text/csv"))]

	def testNumerics(self):
		self.assertTrue(self.output.startswith('1,-2.0,3.0,'))
		
	def testStringDefusion(self):
		self.assertOutputHas(',"Wäre es da nicht'
			' besser,\\n die Regierung setzte das Volk ab\\tund wählte'
			' ein anderes?",')
	
	def testNULLs(self):
		self.assertEqual(self.output.split("\r\n")[1], ',,,,,')
	
	def testWeirdTypes(self):
		self.assertOutputHas('2453130.5,"(124.0, -29.999999999999996)"')


class CSVHeaderOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("text/csv; header=present", False))]

	def testWeirdTypes(self):
		self.assertOutputHas("(124.0, -29.999999999999996)")

	def testParamCommentWritten(self):
		self.assertOutputHas('# intPar = 42 // test integer parameter')

	def testParamStringValueUsed(self):
		self.assertOutputHas('# roughFloatPar = 0.3 // \r\n')

	def testNullParamNotWritten(self):
		self.assertOutputHasNot('stringPar')

	def testHeaderPresent(self):
		self.assertEqual(self.output.split("\r\n")[3],
			"anint,afloat,adouble,atext,adate,apos")


class DefaultVOTableOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("votable", False))]

	def testTableDescriptionPickedUp(self):
		self.assertOutputHas('<DESCRIPTION>Some test data with a reason')
	
	def testUnicodeDeclared(self):
		self.assertOutputHas('datatype="unicodeChar"')
	
	def testEncodingInDescription(self):
		self.assertOutputHas('Just by a \xb5')

	def testDataEncoded(self):
		self.assertOutputHas(
			'<STREAM encoding="base64">AAAAAcAAAABACAAAAAAAAAAAAF')

	def testParam(self):
		self.assertOutputHas("<PARAM")
		self.assertOutputHas('name="roughFloatPar"')
		self.assertOutputHas('value="0.3"')
		self.assertOutputHas(' ucd="phys.width"')


class TDVOTableOutputTest(DefaultVOTableOutputTest):
	resources = [("output", _FormattedData("votabletd", False))]

	def testDataEncoded(self):
		self.assertOutputHas("<DATA><TABLEDATA>")
		self.assertOutputHas("<TD>Wäre es da nicht besser,")
	
	def testNullSerialization(self):
		self.assertOutputHas("<TR><TD>-8888</TD><TD>NaN</TD><TD>NaN</TD>"
			"<TD></TD><TD>NaN</TD><TD></TD></TR>")
		self.assertOutputHas('null="-8888"')


class B2VOTableOutputTest(DefaultVOTableOutputTest):
	resources = [("output", _FormattedAndParsedData(
		"votableb2", votable.loads))]

	def testDataEncoded(self):
		self.assertOutputHas("<DATA><BINARY2>")
		self.assertOutputHas('<STREAM encoding="base64">AAAAAAHAAAAAQAg'
			'AAAAAAAAAAABSAFcA5AByAGUA')
	
	def testNULLsDecoded(self):
		data, metadata = self.output[1]
		self.assertEqual(data[1], [None, None, None, None, None, None])
	
	def testGeometry(self):
		# XXX TODO: actually parse this
		data, metadata = self.output[1]
		self.assertEqual(data[0][5], pgsphere.SPoint.fromDegrees(124., -30.))

	def testRegionDeclared(self):
		data, metadata = self.output[1]
		self.assertEqual(metadata[5].xtype, "point")


class V11VOTableOutputTest(DefaultVOTableOutputTest):
	resources = [("output", _FormattedData("votabletd1.1"))]

	def testDataEncoded(self):
		self.assertOutputHas("<DATA><TABLEDATA>")
		self.assertOutputHas("<TD>Wäre es da nicht besser,")

	def testNamespace(self):
		self.assertOutputHas('version="1.1"')
		self.assertOutputHas('xmlns="http://www.ivoa.net/xml/VOTable/v1.1"')

	def testXtypeSwallowed(self):
		self.assertFalse("xtype" in self.output)


class FITSOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("fits", debug=False, textify=False))]

	def testIsFITS(self):
		self.assertOutputHas(b'SIMPLE  =                    T')
	
	def testBintablePresent(self):
		self.assertOutputHas(b"XTENSION= 'BINTABLE'")
	
	def testColumnMapping(self):
		self.assertOutputHas(b"TTYPE2  = 'afloat  '")
		self.assertOutputHas(b"TTYPE3  = 'adouble '")
		self.assertOutputHas(b"TFORM3  = 'D       '")

	def testDescriptionPresent(self):
		self.assertOutputHas(b"TCOMM3  = 'This is a complex beast so it needs"
			b" a description that requires &'   CONTINUE  'continuatio= n cards "
			b"in FITS binary tables.'")

	def testSimplePar(self):
		self.assertOutputHas(
			b"INTPAR  =                   42 / test integer parameter")
	
	def testHierarchPar(self):
		self.assertOutputHas(
			b"HIERARCH exactFloatPar = 0.25 / This can be exactly represented"
				b" in two's complemEND")
	
	def testDataEncoded(self):
		self.assertOutputHas(b"W?re es da nic")
		self.assertOutputHas(b'\x00\x00\x00\x01')

	def testNULLsEncoded(self):
		self.assertEqual(self.output[0x16fa:0x16fc+2+4+8+82+8+8+8],
			b"\xff\xff\xdd\x48"
			b"\x7f\xc0\x00\x00"
			b"\x7f\xf8\x00\x00\x00\x00\x00\x00"
			+b"\x00"*82
			+b"\x7f\xf8\x00\x00\x00\x00\x00\x00"
			+b"\x7f\xf8\x00\x00\x00\x00\x00\x00\x7f\xf8\x00\x00\x00\x00\x00\x00")

	def testCharWithNull(self):
		td = base.parseFromString(rscdef.TableDef, """<table id="t">
			<column name="c" type="char"><values nullLiteral="."/>
			</column></table>""")
		rendered = formats.getFormatted("fits",
			rsc.TableForDef(td, rows=[{"c": None}, {"c": "x"}]))
		self.assertTrue(b"XTENSION= 'BINTABLE" in rendered,
			"Not a bintable?")
		self.assertTrue(b"\0x\0\0\0" in rendered,
			"Char NULLs not serialised?")



def _getHTMLTree(columns, rows):
	td = base.parseFromString(rscdef.TableDef, """<table id="t">
		%s</table>"""%columns)
	t = rsc.TableForDef(td, rows=rows)
	rendered = formats.getFormatted("html", t)
	return testhelpers.getXMLTree(rendered)


class HTMLOutputTest(FormatOutputTest):
	resources = [("output", _FormattedData("html"))]

	def testTableDeclared(self):
		self.assertOutputHas('<table class="results">')
	
	def testMetadataWritten(self):
		self.assertOutputHas('Real</th><th ')
	
	def testNormalData(self):
		self.assertOutputHas(
			'<tr class="data"><td>1</td><td>-2.0</td><td>3.0</td><td>')
		self.assertOutputHas('Wäre es da nicht besser,')
		self.assertOutputHas('wählte ein anderes?</td><td>2453130.5')
	
	def testNULLs(self):
		self.assertOutputHas('<td>N/A</td><td>N/A</td>')

	def testCheckmarks(self):
		tree = _getHTMLTree(
			'<column name="t" type="integer" displayHint="type=checkmark"/>',
			[{"t": 1}, {"t":0}, {"t":None}])
		self.assertEqual(
			[t.text for t in tree.xpath("//td")],
			['✓', None, None])

	def testImageURL(self):
		tree = _getHTMLTree(
			'<column name="t" type="text" displayHint="type=imageURL,width=20"/>',
			[{"t": "http://foo"}, {"t":None}])
		fields = tree.xpath("//td")
		img = fields[0][0]
		self.assertEqual(img.text, None)
		self.assertEqual(img.get("alt"), "Image at http://foo")
		self.assertEqual(img.get("width"), "20")
		self.assertEqual(img.get("src"), "http://foo")
		self.assertEqual(len(fields[1]), 0)

	def testNullHandledMisc(self):
		tree = _getHTMLTree(
			'<column name="h" type="text" displayHint="type=keephtml"/>'
			'<column name="s" type="text" displayHint="type=simbadlink"/>'
			'<column name="b" type="text" displayHint="type=bar"/>',
			[{"h": None, "s": None, "b": None}])
		self.assertEqual(
			[etree.tostring(m) for m in tree.xpath("//td")],
			[b"<td />", b"<td />", b"<td />"])
	
	def testProductMapperNopreview(self):
		tree = _getHTMLTree(
			'<column name="h" type="text" displayHint="type=product,nopreview=T"/>',
			[{"h": "http://foo/image.fits"}, {"h": "gack/image.fits"},
				{"h": None}, {"h": "abc"}])
		fields = tree.xpath("//td")

		a = fields[0].xpath("a")[0]
		self.assertEqual(a.get("href"), "http://foo/image.fits")
		self.assertEqual(a.text, "image.fits")

		prod = fields[1].xpath("a")[0]
		self.assertEqual(prod.get("class"), "productlink")
		self.assertEqual(prod.get("href"),
			"http://localhost:8080/getproduct/gack/image.fits")

		self.assertEqual(len(fields[2]), 0)

		self.assertEqual(fields[3].xpath("a")[0].text, "File")


class FormatDataTest(testhelpers.VerboseTest):
	resources = [("data", _testDataTable)]

	def testRaising(self):
		with open("/dev/null") as dest:
			self.assertRaises(formats.CannotSerializeIn, formats.formatData,
				"wabbadubba", self.data, dest)


class _ExplicitNullTestTable(testhelpers.TestResource):
	"""A table having some types with explicit null values and an all-null row.
	"""
	def make(self, deps):
		td = base.parseFromString(rscdef.TableDef,
			"""
			<table id="nulls">
				<column name="anint" type="integer"><values nullLiteral="-1"/></column>
				<column name="atext" type="text"><values nullLiteral="xxy"/></column>
				<column name="fixtx" type="char(7)"><values nullLiteral="xxy"/></column>
			</table>
			""")
		return rsc.TableForDef(td,
			rows=[dict((col.name, None) for col in td)])


_explicitNullTestTable = _ExplicitNullTestTable()

class ExplicitNullValueTest(testhelpers.VerboseTest):
	resources = [("nullsTable", _explicitNullTestTable)]

	def _runTestForFormat(self, formatName, assertion):
		destF = io.BytesIO()
		formats.formatData(formatName, self.nullsTable, destF)
		assertion(destF.getvalue())

	def testHTML(self):
		def assertion(data):
			self.assertEqual(
				re.search(b'<tr class="data">(<td>.*)</tr>', data).group(0),
					b'<tr class="data"><td>N/A</td><td>N/A</td><td>N/A</td></tr>')
		self._runTestForFormat("html", assertion)
	
	def testTDVOTable(self):
		# there's votabletest exercising this more thoroughly, but while we
		# are at it...
		def assertion(data):
			self.assertTrue(b'<VALUES null="-1"' in data)
			self.assertTrue(b'<TR><TD>-1</TD><TD></TD><TD></TD></TR>' in data)
		self._runTestForFormat("votabletd", assertion)

	def testBinVOTable(self):
		def assertion(data):
			self.assertTrue(b'<VALUES null="-1"' in data)
			self.assertTrue(b'<VALUES null="xxy"' in data)
			decoded = base64.b64decode(re.search(
				b'(?s)<STREAM encoding="base64">(.*)</STREAM>',
				data).group(1))
			self.assertEqual(decoded, b"".join([
				b'\xff\xff\xff\xff',
				b'\x00\x00\x00\x03xxy',
				b'xxy    ',]))
		self._runTestForFormat("votable", assertion)

	def testCSV(self):
		def assertion(data):
			self.assertEqual(b",,", data.strip())
		self._runTestForFormat("text/csv", assertion)

	def testTSV(self):
		def assertion(data):
			self.assertEqual(b'N/A\tN/A\tN/A', data.strip())
		self._runTestForFormat("tsv", assertion)


def _mkr(i, s, d, dd, obl, st):
	return locals()


HTML_OUTPUT_DEF = """
<outputTable id="foo">
	<outputField name="i" type="integer" required="True" description="Ha!"
		unit="Msol"/>
	<outputField name="s" type="text" description="some string &lt;"/>
	<outputField name="d" type="timestamp" tablehead="date"/>
	<outputField name="dd" type="timestamp" tablehead="Ja"
		note="junk"/>
	<outputField name="obl" type="text">
		<formatter>
			if data is None:
				return None
			return "&amp;"+data+"&lt;"
		</formatter>
	</outputField>
	<outputField name="st" type="text" wantsRow="True">
		<formatter>
			if data["i"] is None:
				return None
			return T.a(href=str(2*data["i"]))[str(data["obl"])]
		</formatter>
	</outputField>
	<meta name="note" tag="junk">
		This column only here for no purpose at all
	</meta>
</outputTable>
"""

class _RenderedHTML(testhelpers.TestResource):
	def make(self, ignored):
		td = base.parseFromString(outputdef.OutputTableDef, HTML_OUTPUT_DEF)
		table = rsc.TableForDef(td, rows=[
			_mkr(1, "Hnä",
				datetime.datetime(2005, 4, 3, 2, 1),
				datetime.datetime(2005, 4, 3, 2, 22), "gurke", None),
			_mkr(None, None, None, None, None, None)])
		destF = io.BytesIO()
		formats.formatData("html", table, destF)
		return destF.getvalue(), testhelpers.getXMLTree(
			destF.getvalue(), debug=False)


class HTMLRenderTest(testhelpers.VerboseTest):
	resources = [("rendered", _RenderedHTML())]

	def _assertXpathText(self, xpath, value):
		els = self.rendered[1].xpath(xpath)
		self.assertEqual(len(els), 1, "Ambiguous xpath %s"%xpath)
		self.assertEqual(els[0].text, value)

	def testTitleFallbackOnName(self):
		self._assertXpathText("table/thead/tr[1]/th[1]", "I")

	def testTitleIsTablehead(self):
		self._assertXpathText("table/thead/tr[1]/th[4]", "Ja")

	def testDescriptionTitleEscaped(self):
		self.assertEqual(
			self.rendered[1].xpath("table/thead/tr[1]/th[2]")[0].get("title"),
			"some string <")

	def testNoteInTitle(self):
		self._assertXpathText("table/thead/tr[1]/th[4]/sup/a", "junk")

	def testIntRendered(self):
		self._assertXpathText("table/tbody/tr[1]/td[1]", "1")

	def testIntNull(self):
		self._assertXpathText("table/tbody/tr[2]/td[1]", "N/A")

	def testUnicodeRendered(self):
		self._assertXpathText("table/tbody/tr[1]/td[2]",
			"Hnä")

	def testTextNull(self):
		self._assertXpathText("table/tbody/tr[2]/td[2]", "N/A")
	
	def testDefaultDateDisplay(self):
		self._assertXpathText("table/tbody/tr[1]/td[3]",
			"2005-04-03T02:01:00")

	def testDateNull(self):
		self._assertXpathText("table/tbody/tr[2]/td[3]", "N/A")

	def testISODateDisplay(self):
		self._assertXpathText("table/tbody/tr[1]/td[4]",
			"2005-04-03T02:22:00")

	def testSingleFormatter(self):
		self._assertXpathText("table/tbody/tr[1]/td[5]",
			"&gurke<")

	def testSingleFormatterNull(self):
		self._assertXpathText("table/tbody/tr[2]/td[5]", "N/A")

	def testRowFormatter(self):
		self._assertXpathText("table/tbody/tr[1]/td[6]/a",
			"gurke")
		anchor = self.rendered[1].xpath("table/tbody/tr[1]/td[6]/a")[0]
		self.assertEqual(anchor.get("href"), "2")

	def testRowFormatterNull(self):
		self._assertXpathText("table/tbody/tr[2]/td[6]", "N/A")

	def testFootnotePresent(self):
		self._assertXpathText("dl/dd/p",
			"This column only here for no purpose at all")
		anchor = self.rendered[1].xpath("dl/dt/a")[0]
		self.assertEqual(anchor.get("name"), "note-junk")


class _RenderedKeyValueHTML(testhelpers.TestResource):
	def make(self, ignored):
		td = base.parseFromString(outputdef.OutputTableDef, HTML_OUTPUT_DEF)
		table = rsc.TableForDef(td, rows=[
			_mkr(1, "Hnä",
				datetime.datetime(2005, 4, 3, 2, 1),
				datetime.datetime(2005, 4, 3, 2, 22), "gurke", None)])
		frag = htmltable.HTMLKeyValueFragment(table, svcs.emptyQueryMeta)
		rendered = b"<div>"+formal.flattenSync(frag)+b"</div>"
		return testhelpers.getXMLTree(rendered, debug=False)


class HTMLKeyValueTest(testhelpers.VerboseTest):
	resources = [("tree", _RenderedKeyValueHTML())]

	def testValues(self):
		self.assertEqual(
			[e.text for e in self.tree.xpath("//td[@class='data']")],
			['1', 'Hn\xe4', '2005-04-03T02:01:00',
				'2005-04-03T02:22:00', '&gurke<', None])

	def testDescriptions(self):
		self.assertEqual(
			[e.text for e in self.tree.xpath("//td[@colspan='2']")],
			['Ha!', 'some string <', None, None, None, None])
	
	def testUnitsRendered(self):
		self.assertEqual(self.tree.xpath("//th/br")[0].tail,
			"[Msol]")

	def testFootnotesRendered(self):
		self.assertEqual(self.tree.xpath("dl[dt/a/@name='note-junk']/dd/p")[0].text,
			"This column only here for no purpose at all")


class GeojsonTest(testhelpers.VerboseTest):
	def testSepcooAnnotation(self):
		td = base.parseFromString(rscdef.TableDef, """<table>
			<dm>
				(geojson:FeatureCollection){
					crs: (geojson:CRS) {
						type: name
						properties: (geojson:CRSProperties) {
							name: "urn:private:metric"
						}
					}
					feature: (geojson:Feature) {
						geometry: (geojson:Geometry) {
							type: sepcoo
							latitude: @lat
							longitude: @long
						}
					}
				}
			</dm>
			<column name="lat"/>
			<column name="long"/>
			<column name="name" type="text"/>
			<column name="color" type="text"/>
			</table>""")
		table = rsc.TableForDef(td, rows=[
			{"lat": 1.2, "long": 3.4, "name": "rock1", "color": "red"},
			{"lat": 2.2, "long": -3.4, "name": "rock2", "color": "blue"},])
		tx = formats.getFormatted("geojson", table)
		self.assertEqual(json.loads(tx), {
			"crs": {"type": "name", "properties": {"name": "urn:private:metric"}},
			"type": "FeatureCollection",
			"features": [
				{"type": "Point", "properties": {"name": "rock1", "color": "red"},
					"coordinates": [3.4, 1.2]},
				{"type": "Point", "properties": {"name": "rock2", "color": "blue"},
					"coordinates": [-3.4, 2.2]}]})

	def testSepArrayAnnotation(self):
		td = base.parseFromString(rscdef.TableDef, """<table>
			<dm>
				(geojson:FeatureCollection){
					crs: (geojson:CRS) {
						type: name
						properties: (geojson:CRSProperties) {
							name: "urn:private:metric"
						}
					}
					feature: (geojson:Feature) {
						geometry: (geojson:Geometry) {
							type: seppoly
							c1_1: @c1min
							c2_1: @c2min
							c1_2: @c1max
							c2_2: @c2max
						}
					}
				}
			</dm>
			<column name="c1min"/>
			<column name="c1max"/>
			<column name="c2min"/>
			<column name="c2max"/>
			<column name="c3min"/>
			<column name="c3max"/>
			<column name="name" type="text"/>
			<column name="color" type="text"/>
			</table>""")
		table = rsc.TableForDef(td, rows=[
			{"c1min": 2.2, "c1max": 3.4, "c2min": -1, "c2max": 2.2,
				"c3min": 2003, "c3max": 3004,
				"name": "rock2", "color": "blue"},])
		tx = formats.getFormatted("geojson", table)
		self.assertEqual(json.loads(tx), {
				'crs': {'type': 'name',
				'properties': {'name': 'urn:private:metric'}},
				'type': 'FeatureCollection',
				'features': [{'type': 'Polygon',
					'properties': {'color': 'blue', 'c3min': 2003,
					'c3max': 3004, 'name': 'rock2'},
				'coordinates': [[2.2, 3.4], [-1, 2.2], [2.2, 3.4]]}]})

	def testSepSimplexAnnotation(self):
		td = base.parseFromString(rscdef.TableDef, """<table>
			<dm>
				(geojson:FeatureCollection){
					crs: (geojson:CRS) {
						type: link
						properties: (geojson:CRSProperties) {
							href: "http://where.ever/fantasy"
							type: simple
						}
					}
					feature: (geojson:Feature) {
						geometry: (geojson:Geometry) {
							type: sepsimplex
							c1min: @c1min
							c2min: @c2min
							c1max: @c1max
							c2max: @c2max
						}
					}
				}
			</dm>
			<column name="c1min"/>
			<column name="c1max"/>
			<column name="c2min"/>
			<column name="c2max"/>
			</table>""")
		table = rsc.TableForDef(td, rows=[
			{"c1min": 2.2, "c1max": 3.4, "c2min": -1, "c2max": 2.2},])
		tx = formats.getFormatted("geojson", table)
		self.assertEqual(json.loads(tx), {
			'crs': {'type': 'url',
				'properties': {
						'href': 'http://where.ever/fantasy',
						'type': 'simple'}},
			'type': 'FeatureCollection',
			'features': [
				{'type': 'Polygon', 'properties': {},
					'coordinates': [[2.2, -1], [2.2, 2.2], [3.4, 2.2],
						[3.4, -1], [2.2, -1]]}]})
	
	def testGeometryAnnotation(self):
		td = base.parseFromString(rscdef.TableDef, """<table>
			<dm>
				(geojson:FeatureCollection){
					feature: (geojson:Feature) {
						geometry: (geojson:Geometry) {
							type: geometry
							value: @s_region
						}
					}
				}
			</dm>
			<column name="s_region" type="spoly"/>
			<column name="observed_at" type="timestamp" unit="d"/>
			<column name="s_point" type="spoint"/>
			</table>""")
		table = rsc.TableForDef(td, rows=[{
			"s_region": pgsphere.SPoly.fromDALI([1.25, 1.0, 3, 9.5, 0.5, 4.25]),
			"observed_at": datetime.datetime(2017, 2, 7),
			"s_point": pgsphere.SPoint.fromDegrees(4, 5)
			},])
		tx = formats.getFormatted("geojson", table)
		self.assertEqual(json.loads(tx), {
		 'features': [{
		 	 'coordinates': [[1.25, 1.0], [3.0, 9.5], [0.5, 4.25]],
				'properties': {'observed_at': 2457791.5,
					's_point': [4., 5.]},
				'type': 'Polygon'},],
				'type': 'FeatureCollection'})


class ExtensionRecTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		fName, expectedMT = sample
		self.assertEqual(
			formats.guessMediaType(fName), expectedMT)
	
	samples = [
		("bla.vot", base.votableType),
		("/kvat/hnooik/bla.fits", "application/fits"),
		("http://www.foo.bar/img.ps", "application/postscript"),
		("http://www.foo.bar/map.geojson", "application/geo+json"),
		("http://www.foo.bar/data.json", "application/json"),
		("rund.data.tsv", "text/tab-separated-values"),
		("cat.csv", "text/csv"),
		("cat.vottd", "application/x-votable+xml;serialization=TABLEDATA"),
		("cat.vot5",
			"application/x-votable+xml;serialization=TABLEDATA;version=1.5"),
	]


class ExtensionGenTest(testhelpers.VerboseTest, metaclass=testhelpers.SamplesBasedAutoTest):
	def _runTest(self, sample):
		mt, ext = sample
		self.assertEqual(
			ext, formats.getExtensionFor(mt))

	samples = [
		(base.votableType, ".vot"),
		("application/x-votable+xml; content=datalink", ".vot"),
		("votabletd", ".vottd"),
		("fits", ".fits"),
		("tsv", ".tsv"),
		("text/html", ".html"),
		("text/html;charset=UTF-8", ".html"),
		("text/csv", ".csv"),
		("application/json", ".json"),
		("application/fits", ".fits"),
		("image/fits", ".fits"),
		("image/png", ".png"),
#		("image/jpeg", ".jpeg"),  this is not predicatable in mimetypes
		("vnd.gavo/invalid", ".dat"),
	]


if __name__=="__main__":
	testhelpers.main(HTMLRenderTest)
