"""
Unit tests roughly to do with obscore.
"""

#c Copyright 2008-2021, the GAVO project <gavo@ari.uni-heidelberg.de>
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import contextlib
import os

from gavo.helpers import testhelpers
from gavo import api
from gavo import base
from gavo.protocols import tap
from gavo.registry import builders

import tresc

_obscoreRDTrunk = """<resource schema="test" resdir="data">
			<table id="glob" onDisk="True" mixin="//products#table">
				<param name="foo" type="text">replaced</param>
				<mixin %s>//obscore#publish</mixin>
				<publish/>
			</table>
			<data id="import">
				<dictlistGrammar>
					<rowfilter procDef="//products#define">
						<bind key="table">"test.glob"</bind>
						<bind key="accref">@accref</bind>
						<bind key="path">@accref</bind>
						<bind key="fsize">22</bind>
					</rowfilter>
				</dictlistGrammar>
				<make table="glob"/>
			</data></resource>"""


class ObscoreTest(testhelpers.VerboseTest):

	def testTypeRequired(self):
		self.assertRaisesWithMsg(api.StructureError,
			'At IO:\'<resource schema="test" resdir="data"> <table id="glo...\', (4, 29):'
			" Mixin parameter dataproduct_type mandatory",
			api.parseFromString,
			(api.RD, _obscoreRDTrunk%""))

	def testRestriction(self):
		self.assertRaises(api.StructureError,
			api.parseFromString,
			api.RD, _obscoreRDTrunk%'productType="\'image\'" ',
			context=base.ParseContext(restricted=True))

	def testObscoreProperty(self):
		rd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'" ')
		viewPart = rd.tables[0].properties["obscoreClause"]
		self.assertTrue("CAST('image' AS text) AS dataproduct_type" in viewPart)
		self.assertTrue("CAST(__COMPUTE__ AS text) AS obs_publisher_did" 
			in viewPart)
		self.assertTrue(
			"CAST(accsize/1024 AS bigint) AS access_estsize" in viewPart)
		self.assertTrue("CAST(NULL AS spoly) AS s_region" in viewPart)

	def testScriptAdded(self):
		rd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'" ')
		for script in rd.getById("import").makes[0].table.scripts:
			if script.id=="addTableToObscoreSources":
				break
		else:
			self.fail("addTableToObscoreSources not added -- did obscore#publish"
				" run?")


class _ObscorePublishedTable(testhelpers.TestResource):

	resources = [('conn', tresc.dbConnection)]

	def make(self, dependents):
		conn = dependents["conn"]
		from gavo import rsc

		dd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'"'
			' collectionName="\'testing detritus\'"'
			' creatorDID="\'\\getParam{foo}\'"').getById("import")
		dd.rd.sourceId = "__testing__"
		d = rsc.makeData(dd, 
			forceSource=[{"accref": "foo/bar"}],
			connection=conn, 
			parseOptions=rsc.getParseOptions(buildDependencies=True))
		tap.publishToTAP(dd.rd, conn)
		return d

	def clean(self, data):
		conn = data.tables["glob"].connection
		try:
			data.drop(data.dd, connection=conn)
			tap.unpublishFromTAP(data.dd.rd, conn)
			conn.commit()
		except:
			import traceback
			traceback.print_exc()
			conn.rollback()
		res = list(data.tables["glob"].connection.query("select sqlFragment"
			" from ivoa._obscoresources where tableName='test.glob'"))
		# Yes, this is a test within a test resource.  It's most
		# convenient this way.  I'm sorry.
		assert len(res)==0


class ObscorePublishedTest(testhelpers.VerboseTest):

	resources = [
		('oc', tresc.obscoreTable),
		('data', _ObscorePublishedTable()),
		('hcd', tresc.ssaTestTable),
		('siap', tresc.siapTestTable),
		('conn', tresc.dbConnection)]

	def testJoinPresent(self):
		res = list(self.data.tables["glob"].connection.query("select sqlFragment"
			" from ivoa._obscoresources where tableName='test.glob'"))
		self.assertEqual(len(res), 1)
		self.assertTrue("('ivo://x-testing/~?'"
			" || gavo_urlescape(accref)) AS obs_publisher_did," in res[0][0])

	def testDataIsInObscore(self):
		from gavo import rsc
		oct = rsc.TableForDef(
			base.caches.getRD("//obscore").getById("ObsCore"),
			connection=self.conn)
		res = list(oct.iterQuery(oct.tableDef,
			"obs_id='foo/bar'"))
		self.assertEqual(len(res), 1)
		self.assertEqual(res[0]["dataproduct_type"], 'image')
		self.assertEqual(res[0]["access_estsize"], 0)

	def testAccessibleThroughADQL(self):
		from gavo.protocols import adqlglue
		from gavo.formats import votablewrite
		connection=self.data.tables["glob"].connection
		res = adqlglue.runTAPQuery("select * from ivoa.ObsCore where"
			" obs_collection='testing detritus'", 10, connection, [], 100,
			autoClose=False)
		self.assertTrue(b'<TD>foo/bar</TD>'
			in votablewrite.getAsVOTable(res, tablecoding="td"))

	def testMacroIsExpanded(self):
		res = list(
			self.data.tables["glob"].connection.queryToDicts(
				"select obs_creator_did from ivoa.obscore"
				" where obs_publisher_did='ivo://x-testing/~?foo/bar'"))
		try:
			self.assertEqual(res, [{'obs_creator_did': 'replaced'}])
		except AssertionError:
			import pdb;pdb.Pdb(nosigint=True).set_trace()
			raise

	def testModelDeclarationPresent(self):
		res = list(
			self.conn.query("SELECT dmivorn FROM tap_schema.supportedmodels"
				" WHERE dmname ILIKE 'obscore%%'"))
		self.assertEqual(res, [('ivo://ivoa.net/std/ObsCore#core-1.1',)])


class TablesetTest(testhelpers.VerboseTest):
	resources = [
		('oc', tresc.obscoreTable), 
		("table", tresc.ssaTestTable),
		("ssa", tresc.ssaTestTable)]

	def testObscoreStubInOwnTableset(self):
		vorElement = builders.getVOResourceElement(self.table.tableDef)
		tree = testhelpers.getXMLTree(vorElement.render(), debug=False)

		obscoreStub = tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']")[0]
		self.assertFalse(
			obscoreStub.xpath("column"),
			"ivoa.obscore on published table is not just a stub.")

	def testInTAPTableset(self):
		vorElement = builders.getVOResourceElement(
			base.resolveCrossId("//tap#run"))
		tree = testhelpers.getXMLTree(vorElement.render(), debug=False)
		self.assertEqual(len(tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']")), 1)
		self.assertEqual(len(tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']/column[name='s_ra']")), 1)

		tableInSchema = tree.xpath(
			"metadata/Resource/tableset/schema[name='test']"
			"/table[name='test.hcdtest']")[0]


class _ModifiedObscoreTables(testhelpers.TestResource):

	def make(self, dependents):
		with testhelpers.userconfigContent("""
				<STREAM id="obscore-extraevents">
					<property name="obscoreClause" cumulate="True">
						,
						CAST(\\\\plutoLong AS real) AS pluto_long,
						CAST(\\\\plutoLat AS real) AS pluto_lat
					</property>
				</STREAM>
				<STREAM id="obscore-extrapars">
					<mixinPar name="plutoLong">NULL</mixinPar>
					<mixinPar name="plutoLat">22</mixinPar>
				</STREAM>
				<STREAM id="obscore-extracolumns">
					<column name="pluto_long" tablehead="lambda_Pluto"/>
					<column name="pluto_lat"/>
				</STREAM>"""):

			base.caches.clearForName("__system__/obscore")
			with testhelpers.testFile(
				os.path.join(api.getConfig("inputsDir"), "ex.rd"), """
					<resource schema="__system">
						<table id="instable" onDisk="yes">
							<mixin plutoLong="56">//obscore#publishSSAPHCD</mixin>
						</table>
					</resource>
				""") as fName:
				insTable = base.caches.getRD(fName).getById("instable")
			ocTable = base.caches.getRD("//obscore").getById("ObsCore")
		base.caches.clearForName("__system__/obscore")
	
		return insTable, ocTable
	

class ObscoreModificationTest(testhelpers.VerboseTest):
	
	resources = [("tables", _ModifiedObscoreTables())]

	def testObscoreTableChanged(self):
		_, obscoreTD = self.tables
		self.assertEqual(obscoreTD.getColumnByName("pluto_long").tablehead,
			"lambda_Pluto")

	def testSubstrateChanged(self):
		substrateTD, _ = self.tables
		self.assertTrue("CAST(56 AS real) AS pluto_long" in
			substrateTD.getProperty("obscoreClause"))
		self.assertTrue("CAST(22 AS real) AS pluto_lat" in
			substrateTD.getProperty("obscoreClause"))


class _ObscoreLikeTD(testhelpers.TestResource):
	def make(self, ignored):
		return api.parseFromString(api.RD,
			"""<resource schema="test">

				<table id="ol" onDisk="True">
					<mixin
						dataproduct_type="'image'"
						dataproduct_subtype="NULL"
						>//obscore#publishObscoreLike</mixin>
					<FEED source="//obscore#obscore-columns">
						<PRUNE name="dataproduct_.*"/>
					</FEED>
				</table>
				</resource>
			""").getById("ol")


class ObscoreLikeTest(testhelpers.VerboseTest):
	resources = [("td", _ObscoreLikeTD())]

	def testObscoreColumnsPresent(self):
		self.assertEqual(self.td.getColumnByName("obs_id").description,
			"Unique identifier for an observation")
	
	def testPurgedColumnsMissing(self):
		self.assertRaisesWithMsg(api.NotFoundError,
			"column 'dataproduct_type' could not be located in table ol",
			self.td.getColumnByName,
			("dataproduct_type",))

	def testObscoreClause(self):
		oc = self.td.expand(self.td.getProperty("obscoreClause"))
		self.assertTrue("CAST('image' AS text) AS dataproduct_type," in oc)
		self.assertTrue("	CAST(calib_level AS smallint) AS calib_level," in oc)

	def testTableName(self):
		oc = self.td.expand(self.td.getProperty("obscoreClause"))
		self.assertTrue("CAST('test.ol' AS text) AS source_table" in oc)


class ObscoreDroppingTest(testhelpers.VerboseTest):
	# make sure you don't commit anything here, as other tests rely on
	# having obscore live
	resources = [
		('oc', tresc.obscoreTable),]

	def testDMUndeclared(self):
		from gavo.user import dropping

		class opts:
			systemImport = True
			force = False
			dropAll = True

		with base.getWritableAdminConn() as conn:
			dropping._do_dropRD(opts, "//obscore", conn)
			res = list(conn.query(
				"SELECT dmivorn FROM tap_schema.supportedmodels"
				" WHERE dmname ILIKE 'obscore%%'"))
			self.assertEqual(res, [])
			conn.rollback()


class ObscoreRecoverTest(testhelpers.VerboseTest):
	resources = [
		("conn", tresc.dbConnection),
		('oc', tresc.obscoreTable),]

	@contextlib.contextmanager
	def crap(self, conn):
		conn.execute("insert into ivoa._obscoresources"
			" (tablename, sqlfragment, sourcerd)"
			" values"
			" ('tap_schema.tables', 'totally borken', 'non/existing'),"
			" ('non.existing', 'select * from tap_schema.tables', 'non/existing')")
		try:
			yield 
		finally:
			conn.rollback()

	def testCrapBombs(self):
		with self.crap(self.conn):
			self.assertRaisesWithMsg(base.StructureError,
				api.EqualingRE(".*Execution of python script createObscoreView"
					" failed: syntax error at or near \"totally.*"),
				api.makeData,
				(api.resolveCrossId("//obscore#create"),), 
				connection=self.conn)

	def testCleanup(self):
		with self.crap(self.conn):
			api.makeData(
				api.resolveCrossId("//obscore#recover"), connection=self.conn)
			
			obscoreTables = set(r[0] for r in
				self.conn.query("select tablename from ivoa._obscoresources"))

			self.assertFalse("non.existing" in obscoreTables,
				"Non-existing table not purged from obscore")
			self.assertFalse("tap_schema.tables" in obscoreTables,
				"Table with bad view contribution not purged from obscore tables")

if __name__=="__main__":
	testhelpers.main(ObscoreModificationTest)
