"""
Tests for our datalink implementation.

Note that there's also sodatest, which should contain tests for
DaCHS-defined meta makers and data functions (at least where concerned
with parameters contained in SODA).
"""

#c Copyright 2008-2023, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import gc

from gavo.helpers import testhelpers

from gavo import api
from gavo import svcs
from gavo import votable
from gavo.helpers import trialhelpers
from gavo.protocols import datalink

import tresc


class MetaErrorSemanticsInferenceTest(testhelpers.VerboseTest):
	def _getDatalinkErrors(self, metaMakerDef):
		core = api.parseFromString(datalink.DatalinkCore, """<datalinkCore>
			<descriptorGenerator><code>
					return ProductDescriptor("contrived", "contrived", None,
						"application/bogus")
			</code></descriptorGenerator>
			%s
			</datalinkCore>"""%metaMakerDef)
		args = {"ID": "contrived"} #noflake: variable stolen by dl core
		dlCore = core.adaptForRenderer(
			svcs.getRenderer("dlmeta"),
			svcs.emptyQueryMeta)

		_, _, errors = dlCore.getMetaForDescriptor(dlCore.descriptors[0])
		return errors

	def testSyntaxErrorInferred(self):
		errors = self._getDatalinkErrors("""
			<metaMaker semantics="#grassy"><code>
				return return
				yield descriptor.makeLink("heu")
			</code></metaMaker>""")
		self.assertEqual(errors[0].semantics, "#grassy")

	def testDefaultError(self):
		errors = self._getDatalinkErrors("""
			<metaMaker><code>
				return return
				yield descriptor.makeLink("heu")
			</code></metaMaker>""")
		self.assertEqual(errors[0].semantics, "http://dc.g-vo.org/datalink#other")


class _DumbSODAService(testhelpers.TestResource):
	resources = [("prodtestTable", tresc.prodtestTable)]

	def make(self, dependents):
		svc = api.parseFromString(svcs.Service, """<service id="uh"
			allowed="dlget">
			<datalinkCore>
			</datalinkCore></service>""")
		svc.parent = testhelpers.getTestRD()
		return svc


class DLInterfaceTest(testhelpers.VerboseTest):
	resources = [("svc", _DumbSODAService()),
		("ssa", tresc.ssaTestTable)]

	def testIDNecessary(self):
		self.assertRaisesWithMsg(api.ValidationError,
			"Field ID: ID is mandatory with dlget",
			trialhelpers.runSvcWith,
			(self.svc, "dlget", {}))

	def testMissingAccref(self):
		tree = testhelpers.getXMLTree(
			trialhelpers.runSvcWith(self.svc, "dlmeta",
				{"ID": "junkjunk"})[1])
		self.assertEqual(tree.xpath("//TR/TD[4]")[0].text,
			"NotFoundFault: accref 'junkjunk' could"
			" not be located in product table")

	def testWrongAuthority(self):
		tree = testhelpers.getXMLTree(
			trialhelpers.runSvcWith(self.svc, "dlmeta",
				{"ID": "ivo://junkjunk/knatter"})[1])
		self.assertEqual(tree.xpath("//TR/TD[4]")[0].text,
			"NotFoundFault: The authority in the dataset identifier"
			" 'ivo://junkjunk/knatter' could not be located in the"
			" authorities managed here")

	def testNoNoArgProc(self):
		tree = testhelpers.getXMLTree(
			trialhelpers.runSvcWith(self.svc, "dlmeta",
				{"ID": "data/spec3.ssatest.vot"})[1], debug=False)
		# there's do inputKey-generating metaMaker here, so there shouldn't
		# be a #proc line or a meta resource
		self.assertEqual(len(tree.xpath("//TD[.='#proc']")), 0)
		self.assertEqual(len(tree.xpath("//RESOURCE")), 1)


class _MetaMakerTestData(testhelpers.TestResource):
# test data for datalink metadata generation
	resources = [
		("prodtestTable", tresc.prodtestTable)]

	def make(self, dependents):
		svc = api.parseFromString(svcs.Service, """
		<service id="foo" allowed="dlget,dlmeta,static">
			<property key="staticData">data</property>
			<meta name="dlget.description">A sample processing service</meta>
			<datalinkCore>
				<metaMaker>
					<code>
					ik = MS(InputKey, name="format", type="text",
						ucd="meta.format",
						description="Output format desired",
						values=MS(Values,
							options=[MS(Option, content_=descriptor.mime),
								MS(Option, content_="application/fits")]))
					ik.setProperty("defaultForForm", "application/fits")
					yield ik
					</code>
				</metaMaker>

				<metaMaker semantics="#sibling">
					<code>
					yield descriptor.makeLink("http://foo/bar",
						contentType="test/junk",
						contentLength=500002,
						localSemantics="geneigt",
						contentQualifier="#other")
					yield descriptor.makeLink("http://foo/baz",
						contentType="test/gold",
						semantics="#calibration")
					</code>
				</metaMaker>
				<metaMaker semantics="#othermess">
					<code>
						if descriptor.pubDID.endswith("b.imp"):
							yield DatalinkFault.NotFoundFault("ivo://not.asked.for",
								"Cannot locate other mess")
					</code>
				</metaMaker>
				<metaMaker semantics="http://www.g-vo.org/dl#unrelated">
					<code>
						yield descriptor.makeLinkFromFile(
							"no.such.file", description="An unrelated nothing")
						yield descriptor.makeLinkFromFile(
							"expected.missing", description="A file that just does not"
							" exist and should not create diagnostics.",
							semantics="http://www.g-vo.org/dl#mustnothappen",
							suppressMissing=True)
					</code>
				</metaMaker>
				<metaMaker>
					<code>
						yield descriptor.makeLinkFromFile("data/map1.map",
							description="Some mapping",
							semantics="http://www.g-vo.org/dl#related",
							contentQualifier="#something")
					</code>
				</metaMaker>
				<dataFunction procDef="//soda#generateProduct"/>
			</datalinkCore>
			<publish render="dlmeta" sets="ivo_managed"/>
			</service>""")
		svc.parent = testhelpers.getTestRD()

		mime, data = trialhelpers.runSvcWith(svc, "dlmeta", {
			"id": [
				api.getStandardPubDID("data/a.imp"),
				api.getStandardPubDID("data/b.imp"),
				]})

		from gavo.registry import capabilities
		capEl = capabilities.getCapabilityElement(svc.publications[0])

		return (mime,
			testhelpers.getXMLTree(data, debug=False),
			list(next(votable.parseBytes(data))),
			testhelpers.getXMLTree(capEl.render(), debug=False))

_metaMakerTestData = _MetaMakerTestData()


class DatalinkMetaMakerTest(testhelpers.VerboseTest):
	resources = [("serviceResult", _metaMakerTestData),
		("prodtestTable", tresc.prodtestTable)]

	def testStandardId(self):
		infoEl = self.serviceResult[1].xpath("INFO[@name='standardID']")[0]
		self.assertTrue(infoEl.text.startswith("Written by DaCHS"))
		self.assertEqual(infoEl.get("value"), "ivo://ivoa.net/std/datalink")

	def testIDSet(self):
		svc1 = self.serviceResult[1].xpath("//RESOURCE[@utype='adhoc:service']")[0]
		self.assertEqual(
			svc1.xpath("GROUP/PARAM[@name='ID']")[0].get("value"),
			"ivo://x-testing/~?data/a.imp")
		svc2 = self.serviceResult[1].xpath("//RESOURCE[@utype='adhoc:service']")[1]
		self.assertEqual(
			svc2.xpath("GROUP/PARAM[@name='ID']")[0].get("value"),
			"ivo://x-testing/~?data/b.imp")

	def testMimeOk(self):
		self.assertEqual(self.serviceResult[0],
			"application/x-votable+xml;content=datalink")

	def testUCDPresent(self):
		tree = self.serviceResult[1]
		self.assertEqual(
			tree.xpath("//PARAM[@name='format']")[0].get("ucd"),
			"meta.format")
	
	def testTypeTranslationWorks(self):
		tree = self.serviceResult[1]
		self.assertEqual(
			tree.xpath("//PARAM[@name='format']")[0].get("arraysize"),
			"*")

	def testPreset(self):
		link = self.serviceResult[1].xpath("//PARAM[@name='format']/LINK")[0]
		self.assertEqual(link.get("content-role"), "#pre-set")
		self.assertEqual(link.get("value"), "application/fits")

	def testOptionsRepresented(self):
		tree = self.serviceResult[1]
		self.assertEqual(
			tree.xpath("//PARAM[@name='format']/VALUES/OPTION")[0].get("value"),
			"text/plain")
		self.assertEqual(
			tree.xpath("//PARAM[@name='format']/VALUES/OPTION")[1].get("value"),
			"application/fits")

	def testAccessURLPresent(self):
		tree = self.serviceResult[1]
		self.assertEqual(
			tree.xpath("//PARAM[@name='accessURL']")[0].get("value"),
			"http://localhost:8080/data/test/foo/dlget")

	def testCapability(self):
		intfEl = self.serviceResult[3].xpath("//interface")[0]
		self.assertEqual(
			intfEl.attrib["{http://www.w3.org/2001/XMLSchema-instance}type"],
			"vs:ParamHTTP")
		self.assertEqual(intfEl.xpath("queryType")[0].text, "GET")
		self.assertEqual(intfEl.xpath("resultType")[0].text,
			'application/x-votable+xml;content=datalink')
		self.assertEqual(intfEl.xpath("accessURL")[0].text,
			'http://localhost:8080/data/test/foo/dlmeta')

		self.assertEqual(self.serviceResult[3].xpath("/capability")[0].attrib[
			"standardID"], "ivo://ivoa.net/std/DataLink#links-1.0")

	def testCapabilityParameters(self):
		intfEl = self.serviceResult[3].xpath("//interface")[0]
		for el in intfEl.xpath("param"):
			parName = el.xpath("name")[0].text
			if parName=="ID":
				self.assertEqual(el.attrib["std"], "true")
				self.assertEqual(el.xpath("ucd")[0].text, "meta.id;meta.main")

			elif parName=="responseformat":
				datatype = el.xpath("dataType")[0]
				self.assertEqual(datatype.text, "char")
				self.assertEqual(datatype.get("arraysize"), "*")

			elif parName=="maxrec":
				self.assertEqual(el.xpath("description")[0].text,
					"Maximum number of records returned."
					" Pass 0 to retrieve service parameters.")

			elif parName=="verb":
				pass # forget it; perhaps we'll use it some day.

			else:
				raise AssertionError("Unexpected Parameter %s"%parName)

	def testAsyncDeclared(self):
		svc = api.parseFromString(svcs.Service, """
		<service id="foo" allowed="dlget,dlasync,dlmeta">
			<datalinkCore>
				<metaMaker>
					<code>
						yield MS(InputKey, name="PAR", type="text")
					</code>
				</metaMaker>
				<dataFunction procDef="//soda#generateProduct"/>
			</datalinkCore>
			</service>""")
		svc.parent = testhelpers.getTestRD()

		mime, data = trialhelpers.runSvcWith(svc, "dlmeta", {
			"ID": [
				api.getStandardPubDID("data/a.imp"),
				]})

		tree = testhelpers.getXMLTree(data, debug=False)

		self.assertEqual(len(tree.xpath("//TR")), 4)
		self.assertEqual(
			set(['http://localhost:8080/data/test/foo/dlget',
				'http://localhost:8080/data/test/foo/dlasync']),
			set([p.get("value") for p in tree.xpath("//PARAM[@name='accessURL']")]))
		self.assertEqual(
			set(['ivo://ivoa.net/std/SODA#async-1.0',
				'ivo://ivoa.net/std/SODA#sync-1.0']),
			set([p.get("value")
				for p in tree.xpath("//PARAM[@name='standardID']")]))

	def testCoreForgetting(self):
		from gavo.svcs import renderers
		args = {"ID": api.getStandardPubDID("data/ex.fits")}
		svc = api.getRD("data/cores").getById("dl")
		renderer = renderers.getRenderer("dlmeta")
		gns = testhelpers.getMemDiffer()
		core = svc.core.adaptForRenderer(renderer, svcs.emptyQueryMeta)

		class _Sentinel(object):
			pass
		s = _Sentinel()
		core.ref = s

		coreId = id(core.__dict__)
		self.assertTrue(coreId in set(id(r) for r in gc.get_referrers(s)))
		it = svcs.CoreArgs.fromRawArgs(core.inputTable, args)
		core.runForMeta(svc, it, svcs.emptyQueryMeta)
		core.finalize()
		core.inputTable.breakCircles()
		del core
		del it
		gc.collect()

		# Hack to work around delayed removal of core from locals()
		# perhaps that's actually a python 3.7 bug?
		if "core" in locals():
			locals().pop("core")

		ns = gns()
		self.assertEqual(len(ns), 0, "Uncollected garbage: %s"%ns)
		self.assertFalse(coreId in set(id(r) for r in gc.get_referrers(s)),
			"core still lives and references s")


class _MetaMakerTestRows(testhelpers.TestResource):
	resources = [
		("serviceResult", _metaMakerTestData)]

	def make(self, dependents):
		td = api.resolveCrossId("//datalink#dlresponse", None)
		rows = {}
		for tuple in dependents["serviceResult"][2]:
			row = td.makeRowFromTuple(tuple)
			rows.setdefault((row["ID"], row["semantics"]), []).append(row)
		return rows


class DatalinkMetaRowsTest(testhelpers.VerboseTest):
	resources = [("rows", _MetaMakerTestRows()),
		("serviceResult", _metaMakerTestData)]

	def testAllLinks(self):
		self.assertEqual(len(self.rows), 15)
		for r in list(self.rows.values()):
			self.assertEqual(len(r), 1)
	
	def testAllWithId(self):
		self.assertEqual(set(r[0] for r in self.rows),
			set(['ivo://x-testing/~?data/b.imp',
				'ivo://x-testing/~?data/a.imp',
				'ivo://not.asked.for']))
	
	def testAccessURLStatic(self):
		self.assertEqual(self.rows[
			('ivo://x-testing/~?data/b.imp', '#sibling')][0]["access_url"],
			'http://foo/bar')

	def testAccessURLAccess(self):
		self.assertEqual(self.rows[
			('ivo://x-testing/~?data/b.imp', '#proc')][0]["access_url"],
			# take the actual access URL from the service block.
			None)

	def testAccessURLSelf(self):
		self.assertEqual(self.rows[
			('ivo://x-testing/~?data/b.imp', '#this')][0]["access_url"],
				"http://localhost:8080/getproduct/data/b.imp")
		self.assertEqual(self.rows[
			('ivo://x-testing/~?data/a.imp', '#this')][0]["access_url"],
				"http://localhost:8080/getproduct/data/a.imp")
	
	def testProcDescribed(self):
		self.assertEqual(self.rows[
			('ivo://x-testing/~?data/b.imp', '#proc')][0]["description"],
			"A sample processing service")

	def testMimes(self):
		self.assertEqual(self.rows[('ivo://x-testing/~?data/a.imp',
			'#calibration')][0]["content_type"], 'test/gold')
	
	def testSemantics(self):
		self.assertEqual(set(r[1] for r in self.rows),
			set(['#proc', '#this', '#sibling', '#calibration', '#preview',
				"http://dc.g-vo.org/datalink#other",
				'http://www.g-vo.org/dl#related',
				'http://www.g-vo.org/dl#unrelated',
				]))

	def testLocalSemantics(self):
		withLS = self.rows[('ivo://x-testing/~?data/b.imp',
			'#sibling')]
		self.assertEqual(withLS[0]["local_semantics"], "geneigt")

	def testSizes(self):
		self.assertEqual(self.rows[('ivo://x-testing/~?data/a.imp',
			'#sibling')][0]["content_length"], 500002)
		self.assertEqual(self.rows[('ivo://x-testing/~?data/a.imp',
			'#calibration')][0]["content_length"], None)

	def testServiceLink(self):
		svcRow = self.rows[('ivo://x-testing/~?data/a.imp',
			'#proc')][0]
		resId = svcRow["service_def"]
		for res in self.serviceResult[1].xpath("//RESOURCE"):
			if res.attrib.get("ID")==resId:
				break
		else:
			self.fail("Processing service not in datalink links")
		self.assertEqual(res.attrib.get("type"), "meta")
		self.assertEqual(res.attrib.get("utype"), "adhoc:service")

	def testSelfMeta(self):
		selfRow = self.rows[('ivo://x-testing/~?data/b.imp', '#this')][0]
		self.assertEqual(selfRow["content_type"], "text/plain")
		self.assertEqual(selfRow["content_length"], 73)

	def testMetaError(self):
		errors = self.rows[('ivo://not.asked.for', datalink.DEFAULT_SEMANTICS)]
		self.assertEqual(errors[0]["error_message"],
			'NotFoundFault: Cannot locate other mess')

	def testPreviewMetaURL(self):
		previewRow = self.rows[('ivo://x-testing/~?data/b.imp', '#preview')][0]
		self.assertEqual(previewRow["access_url"],
			"http://example.com/borken.jpg")
		self.assertEqual(previewRow["content_type"],
			"image/jpeg")

	def testPreviewMetaAuto(self):
		previewRow = self.rows[('ivo://x-testing/~?data/a.imp', '#preview')][0]
		self.assertEqual(previewRow["access_url"],
			"http://localhost:8080/getproduct/data/a.imp?preview=True")
		self.assertEqual(previewRow["content_type"],
			"text/plain")
	
	def testFromNonExistingFile(self):
		errRow = self.rows[('ivo://x-testing/~?data/b.imp',
			'http://www.g-vo.org/dl#unrelated')][0]
		self.assertEqual(errRow["error_message"],
			"NotFoundFault: No file for linked item")
		self.assertEqual(errRow["description"],
			"An unrelated nothing")

	def testExpectedNonExisting(self):
		self.assertFalse(('ivo://x-testing/~?data/b.imp',
			'http://www.g-vo.org/dl#mustnothappen') in self.rows,
			"suppressMissing in makeLinkFromFile broken?")

	def testFromFile(self):
		row = self.rows[('ivo://x-testing/~?data/b.imp',
			'http://www.g-vo.org/dl#related')][0]
		self.assertEqual(row["error_message"], None)
		self.assertEqual(row["content_length"], 8)
		self.assertEqual(row["description"], "Some mapping")
		self.assertEqual(row["content_type"], "application/octet-stream")

	def testSemanticsOverridable(self):
		row = self.rows[('ivo://x-testing/~?data/b.imp', "#calibration")][0]
		self.assertEqual(row["content_type"], "test/gold")

	def contentQualifierDefault(self):
		row = self.rows[('ivo://x-testing/~?data/b.imp', "#calibration")][0]
		self.assertEqual(row["content_qualifier"], None)

	def contentQualifierFromLink(self):
		row = self.rows[('ivo://x-testing/~?data/a.imp', "#sibling")][0]
		self.assertEqual(row["content_qualifier"], "#other")

	def contentQualifierFromFile(self):
		row = self.rows[('ivo://x-testing/~?data/a.imp',
			"http://www.g-vo.org/dl#related")][0]
		self.assertEqual(row["content_qualifier"], "#something")


class _TimestampService(testhelpers.TestResource):
	def make(self, ignored):
		svc = api.parseFromString(svcs.Service, """
		<service id="foo" allowed="dlget,dlmeta">
			<datalinkCore>
				<metaMaker>
					<code>
						yield MS(InputKey, name="t", type="timestamp[2]",
							ucd="time.epoch",
							xtype="interval",
							values=MS(Values, default=[
								datetime.datetime(2020, 1, 1),datetime.datetime(2021, 1, 1)]))
					</code>
				</metaMaker>
				<descriptorGenerator>
					<code>
						if pubDID=="bogus":
							return DatalinkFault.NotFoundFault(pubDID, "No bogus data here")

						return ProductDescriptor(pubDID, "bogus", None,
							"application/bogus")
					</code>
				</descriptorGenerator>
				<dataFunction>
					<code>
						assert isinstance(args["t"][0], datetime.datetime)
						assert isinstance(args["t"][1], datetime.datetime)
						descriptor.data = "%s: %s"%(descriptor.pubDID, repr(args["t"]))
					</code>
				</dataFunction>
			</datalinkCore>
			</service>""")
		svc.parent = testhelpers.getTestRD()
		return svc


class TimestampTest(testhelpers.VerboseTest):
	resources = [("svc", _TimestampService())]

	def testTimestampsMeta(self):
		mime, data = trialhelpers.runSvcWith(self.svc, "dlmeta", {
			"ID": "beknackt"})
		tree = testhelpers.getXMLTree(data, debug=False)
		# this sucks -- xtype=interval and xtype=timestamp is mutually
		# exclusive at this point.  I invent something.
		pe = tree.xpath("//GROUP[@name='inputParams']/PARAM[@name='t']")
		self.assertEqual(len(pe), 1)
		self.assertEqual(pe[0].get("xtype"), "timestamp-interval")
		self.assertEqual(pe[0].get("arraysize"), "19x2")
		self.assertEqual(pe[0].get("datatype"), "char")
		self.assertEqual(pe[0].get("value"),
			"2020-01-01T00:00:002021-01-01T00:00:00")

	def testTimestampsParsed(self):
		data = trialhelpers.runSvcWith(self.svc, "dlget", {
			"ID": "beknackt", "t": "2008-04-01T12:00:002008-05-01T12:00:00"}
			)
		self.assertEqual(data,
			# should be different when we properly do the xtype thing
			"beknackt: [datetime.datetime(2008, 4, 1, 12, 0), datetime.datetime(2008, 5, 1, 12, 0)]")

	def testFaultsFromDescGen(self):
		data = trialhelpers.runSvcWith(self.svc, "dlmeta", {
			"ID": "bogus"})
		s, metadata = votable.loads(data[1])
		res = list(metadata.iterDicts(s))

		self.assertEqual(len(res), 1)
		self.assertEqual(
			(res[0]["ID"], res[0]["error_message"], res[0]["semantics"]),
			("bogus", "NotFoundFault: No bogus data here", "#this"))


if __name__=="__main__":
	testhelpers.main(DatalinkMetaRowsTest)
