# -*- coding: utf-8 -*-
"""
Twisted trial-based tests for TAP and UWS.

Tests not requiring the TAP renderer go to taptest.py

The real big integration tap tests use the HTTP interface and are in
an external resource package taptest.
"""

#c Copyright 2008-2019, the GAVO project
#c
#c This program is free software, covered by the GNU GPL.  See the
#c COPYING file in the source distribution.


import atexit
import datetime
import httplib
import re
import os
import sys
import unittest
import urllib
import urlparse
import warnings

from nevow import context
from nevow import flat
from nevow import testutil
from nevow import url
from nevow import util
from twisted.internet import reactor
from twisted.python import threadable
threadable.init()

from gavo.helpers import trialhelpers

from gavo import base
from gavo import rscdesc
from gavo.protocols import scs  # for table's q3c mixin
from gavo.web import weberrors
from gavo.web.taprender import TAPRenderer

base.DEBUG = True


votWithOldGeom = 'eJxdUctugzAQvOcrVj72YAfUEwIkClRCigIiNNdqC26DRDCxHUj69bVDUKNcPLPa2dmH/X1eRW+b\nFEYuVSv6gDjUJSu4HLteBeSg9eAxNk0TbUeBtOeamRQzVfjVcTZadbjyy3SXf5RxaujNzuB7lm4S\nyJKAKIWfnahRG38CKCVeVfvLA/JCoEGN+jqYoD6gJNDjkT9XnOsmIINQlJ9M0Lc6IA3/MXQuVLr2\nIqWliIWQjaKFUK0tdBO6x+7MXQKXWYnNqfOKPNtWduYk3cVlVlRZvg2zuNzB0hHEN+DApT5LDjXv\nNZc+e1SvfHbbzppEVbQsPXMAvyotWJKEyzRwa+HS9doB59WCz0zaypnVm/ffw7S743JN9nBhdv+z\n8A+9NIXB\n'
votWithDALIGeom = 'eJxdkcFugzAMhu88hZUHSFq2UwVIDJiEVI0KWK9TBpmGRBOWBGj39HNK0ape8tvJ599xEhyLOn7Z\nZzAJbTolQ7KlT8SD86mXJiTf1g47xuZ5pt2kOJXCMjxiWMU/e8EmR0deUGZV8V4mGYZXO9TXPNun\nkKchMYZ/9KrhFv0JcK35xXS/IiQ+gZZbbi8DJq0a0ZGA5CfxWDM2bUgGZaj4wWThjW12sbFaJUrp\n1tCDMp2j/ZQeeT8KND8v5KA6ad0t06xKyvxQ58VblCdlBWsHUF/AB6HtqAU0QlqhA3ZPewG7zuNM\n4jpex1xigKAunbggjXy62Wxh++wkYLjhAOYIXP+rsMFN1xdjd6/Ibv8Sed4fDoV9KQ==\n'


class TAPRenderTest(trialhelpers.RenderTest):
	_tapService = base.caches.getRD("__system__/tap").getById("run")
	@property
	def renderer(self):
		ctx = trialhelpers.getRequestContext("/async")
		return TAPRenderer(ctx, self._tapService)


class SyncMetaTest(TAPRenderTest):
	"""tests for non-doQuery sync queries.
	"""
	def testVersionRejected(self):
		"""requests with bad version are rejected.
		"""
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "getCapabilities", "VERSION": "0.1"},
			['<INFO name="QUERY_STATUS" value="ERROR">Version mismatch'])

	def testNoSyncPaths(self):
		"""segments below sync are 404.
		"""
		return trialhelpers.runQuery(self.renderer, "GET", "/sync/foo/bar", {}
			).addCallback(lambda res: ddt
			).addErrback(lambda res: None)

	def testCapabilities(self):
		"""simple get capabilities response looks ok.
		"""
		return self.assertGETHasStrings("/sync", 
			{"REQUEST": "getCapabilities"}, [
				'<capability standardID="ivo://ivoa.net/std/TAP', 
				'ParamHTTP">'])


class SyncQueryTest(TAPRenderTest):
	"""tests for querying sync queries.
	"""
	aVOTable = os.path.join(base.getConfig("inputsdir"), 
		"data/vizier_votable.vot")

	def testNoLangRejected(self):
		return self.assertGETHasStrings("/sync", {
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<3'
			}, [
				"<INFO", "Required parameter 'lang' missing.</INFO>"])

	def testBadLangRejected(self):
		return self.assertGETHasStrings("/sync", {
				"LANG": "Furz",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<3'
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">This service does'
				' not support the query language Furz'])

	def testSimpleQuery(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<2'
			}, [
				'<RESOURCE type="results"',
				'ucd="pos.eq.ra;meta.main"',
				'ID="alpha"',
				'unit="deg"',
				'name="alpha"'])

	def testOverflowAndCast(self):
		return self.assertGETHasStrings("/sync", {
				"LANG": "ADQL",
				"MAXREC": "1",
				"QUERY": 'SELECT cast(alpha as national char(3)) as foo FROM test.adql',
				"FORMAT": "votable/td",
			}, [
				'<INFO name="QUERY_STATUS" value="OVERFLOW"', 
				'datatype="unicodeChar"', 'arraysize="*"',
				'sample RA -- *TAINTED*',
				'<TD>2  </TD>'])

	def testNoOverflowWithTOP(self):
		return self.assertGETLacksStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"MAXREC": "1",
				"QUERY": 'SELECT TOP 1 alpha FROM test.adql'
			}, [
				'value="OVERFLOW"', ])

	def testMAXREC0(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"MAXREC": "0",
				"QUERY": 'SELECT alpha FROM test.adql'
			}, [
				'Query successful', 
				'<COOSYS',
				'datatype="float"'])

	def testBadFormat(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha FROM test.adql WHERE alpha<2',
				"RESPONSEFORMAT": 'xls'
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">Field RESPONSEFORMAT:'
				' Unsupported format \'xls\'', 'Legal format codes include'])

	def testClearVOT(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT CIRCLE(\'ICRS\', alpha, delta+1e-10, 10)'
					' FROM test.adql WHERE alpha<3',
				"FORMAT": "votable/td"
			}, [
				'<TABLEDATA><TR><TD>2.0', '14.0', '10.0',
				'xtype="circle"'])

	def testCSV(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha,delta FROM test.adql WHERE alpha<3',
				"FORMAT": "text/csv"
			}, [
				'2.0,14.0'])

	def testTSV(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"FORMAT": "TSV"
			}, [
				'2.0\t14.0'])

	def testJSON(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": "SELECT TOP 3 column_name, datatype FROM TAP_SCHEMA.columns"
					" WHERE table_name='tap_schema.tables' ORDER BY column_name",
				"RESPONSEFORMAT": "application/json",
				"MAXREC": "1"
			}, [
				'"contains": "table"',
				'"description": "ADQL datatype"',
# Note: this result isn't quite right -- MAXREC=1 in our current implementation
# returns 2 rows.  Perhaps we should change that some day.
				'[["description", "char"], ["schema_name", "char"]]',
				'"queryStatus": "OVERFLOW"'])

	def testBin2Table(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"FORMAT": "application/x-votable+xml;serialization=BINARY2"
			}, [
				'BINARY2', 'AEAAAABBYAAA'])

	def testFITSTable(self):
		return self.assertGETHasStrings("/sync", {
				"REQUEST": "doQuery",
				"LANG": "ADQL",
				"QUERY": 'SELECT alpha, delta FROM test.adql WHERE alpha<3',
				"RESPONSEFORMAT": "fits",
				"MAXREC": "0",
			}, [
				'SIMPLE  =',
				"TYPE1  = 'alpha   '",
				"TCOMM1  = 'A sample RA'"])

	def testBadUploadSyntax(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM test.adql'
			}, [
				"only allow regular SQL identifiers"])

	def testBadUploadSyntax2(self):
		return self.assertPOSTHasStrings("/sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "bar,http://x.y;",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			"only allow regular SQL identifiers"
			])

	def testNonExistingUpload(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,http://127.0.0.1:65000",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM test.adql'
			}, [
				"'http://127.0.0.1:65000' cannot be retrieved</INFO",
				"Connection refused"])

	def testUploadCannotReadLocalFile(self):
		return self.assertPOSTHasStrings("/sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "bar,file:///etc/passwd",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			"'file:///etc/passwd' cannot be retrieved</INFO",
			"unknown url type"
			]).addErrback(lambda failure: None)

	def testMalformedUploadURL(self):
		return self.assertPOSTHasStrings("/sync", {
			"REQUEST": "doQuery",
			"UPLOAD": "http://fit://file://x.ab",
			"LANG": "ADQL",
			"QUERY": 'SELECT * FROM test.adql'}, [
			'<INFO name="QUERY_STATUS" value="ERROR">Field UPLOAD:'
			' Syntax error in UPLOAD parameter'
			])

	def testInlineUploadFromArgsWorks(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,param:HoNk",
				"LANG": "ADQL",
				"QUERY": 'SELECT * FROM tap_upload.bar join test.adql on'
					' (alpha-"_RAJ2000"<0)',
				"HoNk": open(self.aVOTable).read(),
			}, [
				'xmlns="http://www.ivoa.net/xml/VOTable/',
				'ucd="pos.eq.ra;meta.main"',
				'encoding="base64"'])

	def testMissingInlineParameter(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "bar,param:HoNk",
				"LANG": "ADQL",
				"QUERY": 'SELECT top 1 * FROM tap_upload.bar',
				"MoNk": open(self.aVOTable).read(),
			}, [
				'<INFO name="QUERY_STATUS" value="ERROR">No parameter for'
				' upload table bar'])

	def testWithUploadOldGeom(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "t1,param:HoNk",
				"LANG": "ADQL",
				"FORMAT": "votable/td",
				"QUERY": "SELECT ssa_location,"
				  " POINT('ICRS', COORD1(ssa_location), COORD2(ssa_location)) AS np"
				  " FROM test.adql join tap_upload.t1"
					" ON 1=contains(ssa_location, circle('', alpha, delta, 0.002))",
				"HoNk": votWithOldGeom.decode("base64").decode("zlib")
			}, [
				"<TD>Position UNKNOWNFrame 2.0009",
				"14.00100",
				"<TD>2.00", " 14.00"])

	def testWithUploadGeom(self):
		return self.assertPOSTHasStrings("/sync", {
				"REQUEST": "doQuery",
				"UPLOAD": "t1,param:HoNk",
				"LANG": "ADQL",
				"FORMAT": "votable/td",
				"QUERY": 'SELECT * FROM test.adql join tap_upload.t1'
					" on 1=contains(ssa_location, circle('', alpha, delta, 0.002))",
				"HoNk": votWithDALIGeom.decode("base64").decode("zlib")
			}, ["<TD>2.00", " 14.00"])

	def testArraySerialisation(self):
		return self.assertPOSTHasStrings("/sync", {
			"REQUEST": "doQuery",
			"LANG": "ADQL",
			"QUERY": 'SELECT gavo_histogram(alpha, 0, 360, 4) as h FROM test.adql'}, [
			'AAAABgAAAAAAAAACAAAAAAAAAAAAAAABAAAAAA==',
			'arraysize="*" datatype="int" name="h"'])


class SimpleAsyncTest(TAPRenderTest):
	"""tests for some non-ADQL async queries.
	"""
	def testVersionRejected(self):
		"""requests with bad version are rejected.
		"""
		return self.assertPOSTHasStrings("/async", {
				"REQUEST": "getCapabilities",
				"VERSION": "0.1"},
			['<INFO name="QUERY_STATUS" value="ERROR">Version mismatch'])

	def testJobList(self):
		return self.assertGETHasStrings("/async", {}, [
			'<uws:jobs ',
			' xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0" ',
			' version="1.1" ',
			])

	def testNonExistingPhase(self):
		return self.assertGETHasStrings("/async/23/phase", {},
			['<VOTABLE ', 'ERROR">UWS job \'23\' could not'])

	def testLifecycle(self):
		"""tests job creation, redirection, phase, and deletion.
		"""
		# This one is too huge and much too slow for a unit test.  Still
		# I want at least one integration-type test in here since the
		# big test probably won't be run at every commit.
		def assertDeleted(result, jobId):
			self.assertEqual(result[1].code, 303)
			next = result[1].headers_out["location"][len(
				self._tapService.getURL("tap")):]
			self.assertEqual(next, "/async",
				"Deletion redirect doesn't point to job list but to %s"%next)
			return self.assertGETLacksStrings(next, {}, ['jobref id="%s"'%jobId]
			).addCallback(lambda res: reactor.disconnectAll())

		def delete(jobId):
			return trialhelpers.runQuery(self.renderer, "DELETE", "/async/"+jobId, {}
			).addCallback(assertDeleted, jobId)

		def assertStarted(lastRes, jobId):
			# lastRes must be a redirect to the job info page
			req = lastRes[1]
			self.assertEqual(req.code, 303)
			self.assertEqual(req.headers_out["location"], 
				 "http://localhost:8080/__system__/tap/run/tap/async/"+jobId)
			return delete(jobId)

		def promote(ignored, jobId):
			return trialhelpers.runQuery(self.renderer, "POST", 
				"/async/%s/phase"%jobId, {"PHASE": "RUN"}
			).addCallback(assertStarted, jobId)

		def checkPlan(ignored, jobId):
			return self.assertGETHasStrings("/async/%s/plan"%jobId, {},
				["<plan:operation><plan:description>Limit</plan:description>"]
			).addCallback(promote, jobId)

		def checkQuote(ingored, jobId):
			return self.assertGETHasStrings("/async/%s/quote"%jobId, {},
				['-']
				).addCallback(checkPlan, jobId)

		def checkPhase(jobId):
			return self.assertGETHasStrings("/async/%s/phase"%jobId, {},
				['PENDING']
				).addCallback(checkQuote, jobId)

		def checkPosted(result):
			# jobId is in location of result[1]
			request = result[1]
			self.assertEqual(request.code, 303)
			next = request.headers_out["location"]
			self.failIf("/async" not in next)
			jobId = next.split("/")[-1]
			return checkPhase(jobId)

		return trialhelpers.runQuery(self.renderer, "POST", "/async", {
			"LANG": "ADQL", 
			"QUERY": "SELECT alpha FROM test.adql WHERE alpha<3"}
		).addCallback(checkPosted)

	def testBadConstructionArgument(self):
		def checkPosted(result):
			request = result[1]
			self.assertEqual(request.code, 400)
			self.failUnless("base 10: 'kaputt" in result[0])
			# it would be cool if we could check that the job has actually
			# not been created -- but even looking at the DB that's not trivial
			# to do reliably.

		return trialhelpers.runQuery(self.renderer, "POST", "/async", {
			"LANG": "ADQL", 
			"MAXREC": "kaputt",
			"QUERY": "SELECT ra FROM test.adql WHERE ra<3"}
		).addCallback(checkPosted)

	def testJoblistLAST(self):
		def assertMaxOneResult(result):
			self.assertEqual(result[1].code, 200)
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall('<uws.jobref ', result[0]))
			self.assertTrue(nJobs<2, "LAST=1 returns %s jobrefs"%nJobs)

		return trialhelpers.runQuery(self.renderer, "GET", "/async", {
			"LAST": 1}
			).addCallback(assertMaxOneResult)

	def testJoblistAFTER(self):
		def assertNoResult(result):
			self.assertEqual(result[1].code, 200)
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall('<uws.jobref ', result[0]))
			self.assertEqual(nJobs, 0)

		return trialhelpers.runQuery(self.renderer, "GET", "/async", {
			"AFTER": (datetime.datetime.utcnow()
				+datetime.timedelta(seconds=1)).isoformat()}
			).addCallback(assertNoResult)

	def testJoblistPHASE(self):
		def assertMaxOneResult(result):
			self.assertEqual(result[1].code, 200)
			self.assertTrue('xmlns:uws="http://www.ivoa.net/xml/UWS/v1.0',
				result[0])
			nJobs = len(re.findall('<uws.jobref ', result[0]))
			self.assertTrue(nJobs==0, "PHASE=ABORTED returns a jobref?")

		return trialhelpers.runQuery(self.renderer, "GET", "/async", {
			"PHASE": 'ABORTED'}
			).addCallback(assertMaxOneResult)



atexit.register(trialhelpers.provideRDData("test", "ADQLTest"))
