"""
Unit tests roughly to do with obscore.
"""

#c Copyright 2008-2019, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.

import os

from gavo.helpers import testhelpers
from gavo import api
from gavo import base
from gavo.protocols import tap
from gavo.registry import builders

import tresc

_obscoreRDTrunk = """<resource schema="test" resdir="data">
			<table id="glob" onDisk="True" mixin="//products#table">
				<param name="foo" type="text">replaced</param>
				<mixin %s>//obscore#publish</mixin>
				<publish/>
			</table>
			<data id="import">
				<dictlistGrammar>
					<rowfilter procDef="//products#define">
						<bind key="table">"test.glob"</bind>
						<bind key="accref">@accref</bind>
						<bind key="path">@accref</bind>
						<bind key="fsize">22</bind>
					</rowfilter>
				</dictlistGrammar>
				<make table="glob"/>
			</data></resource>"""


class ObscoreTest(testhelpers.VerboseTest):

	def testTypeRequired(self):
		self.assertRaisesWithMsg(api.StructureError,
			'At [<resource schema="test" res...], (4, 29):'
			" Mixin parameter productType mandatory",
			api.parseFromString,
			(api.RD, _obscoreRDTrunk%""))

	def testRestriction(self):
		self.assertRaises(api.StructureError,
			api.parseFromString,
			api.RD, _obscoreRDTrunk%'productType="\'image\'" ',
			context=base.ParseContext(restricted=True))

	def testObscoreProperty(self):
		rd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'" ')
		viewPart = rd.tables[0].properties["obscoreClause"]
		self.failUnless("CAST('image' AS text) AS dataproduct_type" in viewPart)
		self.failUnless("CAST($COMPUTE AS text) AS obs_publisher_did" in viewPart)
		self.failUnless(
			"CAST(accsize/1024 AS bigint) AS access_estsize" in viewPart)
		self.failUnless("CAST(NULL AS spoly) AS s_region" in viewPart)

	def testObscoreLateMixin(self):
		rd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'" ')
		for script in rd.getById("import").makes[0].scripts:
			if script.id=="addTableToObscoreSources":
				break
		else:
			self.fail("addTableToObscoreSources not added -- did obscore#publish"
				" run?")


class _ObscorePublishedTable(testhelpers.TestResource):

	resources = [('conn', tresc.dbConnection)]

	def make(self, dependents):
		conn = dependents["conn"]
		from gavo import rsc
		rsc.makeData(api.resolveCrossId("//obscore#makeSources"),
			connection=conn)
		rsc.makeData(api.resolveCrossId("//obscore#create"),
			connection=conn)

		dd = api.parseFromString(api.RD, 
			_obscoreRDTrunk%'productType="\'image\'"'
			' collectionName="\'testing detritus\'"'
			' creatorDID="\'\\getParam{foo}\'"').getById("import")
		dd.rd.sourceId = "__testing__"
		d = rsc.makeData(dd, forceSource=[{"accref": "foo/bar"}],
			connection=conn)
		tap.publishToTAP(dd.rd, conn)
		return d

	def clean(self, data):
		conn = data.tables["glob"].connection
		try:
			data.drop(data.dd, connection=conn)
			conn.commit()
		except:
			import traceback
			traceback.print_exc()
			conn.rollback()
		res = list(data.tables["glob"].query("select sqlFragment"
			" from ivoa._obscoresources where tableName='test.glob'"))
		# Yes, this is a test within a test resource.  It's most
		# convenient this way.  I'm sorry.
		assert len(res)==0


class ObscorePublishedTest(testhelpers.VerboseTest):

	resources = [
		('oc', tresc.obscoreTable),
		('data', _ObscorePublishedTable()),
		('conn', tresc.dbConnection)]

	def testJoinPresent(self):
		res = list(self.data.tables["glob"].query("select sqlFragment"
			" from ivoa._obscoresources where tableName='test.glob'"))
		self.assertEqual(len(res), 1)
		self.failUnless("('ivo://x-unregistred/~?'"
			" || gavo_urlescape(accref)) AS obs_publisher_did," in res[0][0])

	def testDataIsInObscore(self):
		from gavo import rsc
		oct = rsc.TableForDef(
			base.caches.getRD("//obscore").getById("ObsCore"),
			connection=self.conn)
		res = list(oct.iterQuery(oct.tableDef,
			"obs_id='foo/bar'"))
		self.assertEqual(len(res), 1)
		self.assertEqual(res[0]["dataproduct_type"], 'image')
		self.assertEqual(res[0]["access_estsize"], 0)

	def testAccessibleThroughADQL(self):
		from gavo.protocols import adqlglue
		from gavo.formats import votablewrite
		querier = api.UnmanagedQuerier(
			connection=self.data.tables["glob"].connection)
		res = adqlglue.query(querier, "select * from ivoa.ObsCore where"
			" obs_collection='testing detritus'")
		self.failUnless('<TD>foo/bar</TD>'
			in votablewrite.getAsVOTable(res, tablecoding="td"))

	def testMacroIsExpanded(self):
		res = list(
			self.data.tables["glob"].connection.queryToDicts(
				"select obs_creator_did from ivoa.obscore"
				" where obs_publisher_did='ivo://x-unregistred/~?foo/bar'"))
		self.assertEqual(res, [{'obs_creator_did': u'replaced'}])

	def testModelDeclarationPresent(self):
		res = list(
			self.conn.query("SELECT dmivorn FROM tap_schema.supportedmodels"
				" WHERE dmname ILIKE 'obscore%%'"))
		self.assertEqual(res, [('ivo://ivoa.net/std/ObsCore#core-1.1',)])


class TablesetTest(testhelpers.VerboseTest):
	resources = [('oc', tresc.obscoreTable), ("table", tresc.ssaTestTable)]

	def testObscoreStubInOwnTableset(self):
		vorElement = builders.getVOResourceElement(self.table.tableDef)
		tree = testhelpers.getXMLTree(vorElement.render(), debug=False)

		obscoreStub = tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']")[0]
		self.failIf(
			obscoreStub.xpath("column"),
			"ivoa.obscore on published table is not just a stub.")

	def testInTAPTableset(self):
		vorElement = builders.getVOResourceElement(
			base.resolveCrossId("//tap#run"))
		tree = testhelpers.getXMLTree(vorElement.render(), debug=False)
		self.assertEqual(len(tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']")), 1)
		self.assertEqual(len(tree.xpath(
			"metadata/Resource/tableset/schema[name='ivoa']"
			"/table[name='ivoa.obscore']/column[name='s_ra']")), 1)

		tableInSchema = tree.xpath(
			"metadata/Resource/tableset/schema[name='test']"
			"/table[name='test.hcdtest']")[0]


class _ModifiedObscoreTables(testhelpers.TestResource):
	
	def make(self, dependents):
		with testhelpers.userconfigContent("""
				<STREAM id="obscore-extraevents">
					<property name="obscoreClause" cumulate="True">
						,
						CAST(\\\\plutoLong AS real) AS pluto_long,
						CAST(\\\\plutoLat AS real) AS pluto_lat
					</property>
				</STREAM>
				<STREAM id="obscore-extrapars">
					<mixinPar name="plutoLong">NULL</mixinPar>
					<mixinPar name="plutoLat">22</mixinPar>
				</STREAM>
				<STREAM id="obscore-extracolumns">
					<column name="pluto_long" tablehead="lambda_Pluto"/>
					<column name="pluto_lat"/>
				</STREAM>"""):

			base.caches.clearForName("__system__/obscore")
			with testhelpers.testFile(
				os.path.join(api.getConfig("inputsDir"), "ex.rd"), """
					<resource schema="__system">
						<table id="instable" onDisk="yes">
							<mixin plutoLong="56">//obscore#publishSSAPHCD</mixin>
						</table>
					</resource>
				""") as fName:
				insTable = base.caches.getRD(fName).getById("instable")
			ocTable = base.caches.getRD("//obscore").getById("ObsCore")
		base.caches.clearForName("__system__/obscore")
	
		return insTable, ocTable

	

class ObscoreModificationTest(testhelpers.VerboseTest):
	
	resources = [("tables", _ModifiedObscoreTables())]

	def testObscoreTableChanged(self):
		_, obscoreTD = self.tables
		self.assertEqual(obscoreTD.getColumnByName("pluto_long").tablehead,
			"lambda_Pluto")

	def testSubstrateChanged(self):
		substrateTD, _ = self.tables
		self.failUnless("CAST(56 AS real) AS pluto_long" in
			substrateTD.getProperty("obscoreClause"))
		self.failUnless("CAST(22 AS real) AS pluto_lat" in
			substrateTD.getProperty("obscoreClause"))


class _ObscoreLikeTD(testhelpers.TestResource):
	def make(self, ignored):
		return api.parseFromString(api.RD,
			"""<resource schema="test">

				<table id="ol" onDisk="True">
					<mixin
						dataproduct_type="'image'"
						dataproduct_subtype="NULL"
						>//obscore#publishObscoreLike</mixin>
					<FEED source="//obscore#obscore-columns">
						<PRUNE name="dataproduct_.*"/>
					</FEED>
				</table>
				</resource>
			""").getById("ol")


class ObscoreLikeTest(testhelpers.VerboseTest):
	resources = [("td", _ObscoreLikeTD())]

	def testObscoreColumnsPresent(self):
		self.assertEqual(self.td.getColumnByName("obs_id").description,
			"Unique identifier for an observation")
	
	def testPurgedColumnsMissing(self):
		self.assertRaisesWithMsg(api.NotFoundError,
			"column 'dataproduct_type' could not be located in table ol",
			self.td.getColumnByName,
			("dataproduct_type",))

	def testObscoreClause(self):
		oc = self.td.expand(self.td.getProperty("obscoreClause"))
		self.assertTrue("CAST('image' AS text) AS dataproduct_type," in oc)
		self.assertTrue("	CAST(calib_level AS smallint) AS calib_level," in oc)

	def testTableName(self):
		oc = self.td.expand(self.td.getProperty("obscoreClause"))
		self.assertTrue("CAST('test.ol' AS text) AS source_table" in oc)


class ObscoreDroppingTest(testhelpers.VerboseTest):
	# make sure you don't commit anything here, as other tests rely on
	# having obscore live
	def testDMUndeclared(self):
		from gavo.user import dropping

		class opts:
			systemImport = True
			force = False
			dropAll = True

		with base.getWritableAdminConn() as conn:
			dropping._do_dropRD(opts, "//obscore", conn)
			res = list(conn.query(
				"SELECT dmivorn FROM tap_schema.supportedmodels"
				" WHERE dmname ILIKE 'obscore%%'"))
			self.assertEqual(res, [])
			conn.rollback()


if __name__=="__main__":
	testhelpers.main(ObscoreModificationTest)
