summaryrefslogtreecommitdiffstats
path: root/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py
diff options
context:
space:
mode:
Diffstat (limited to 'tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py')
-rw-r--r--tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py271
1 files changed, 0 insertions, 271 deletions
diff --git a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py b/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py
deleted file mode 100644
index 1561dab32..000000000
--- a/tmk_core/tool/mbed/mbed-sdk/workspace_tools/test_mysql.py
+++ /dev/null
@@ -1,271 +0,0 @@
-"""
-mbed SDK
-Copyright (c) 2011-2014 ARM Limited
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
-Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
-"""
-
-import re
-import MySQLdb as mdb
-
-# Imports from TEST API
-from workspace_tools.test_db import BaseDBAccess
-
-
-class MySQLDBAccess(BaseDBAccess):
- """ Wrapper for MySQL DB access for common test suite interface
- """
- def __init__(self):
- BaseDBAccess.__init__(self)
- self.DB_TYPE = 'mysql'
-
- def detect_database(self, verbose=False):
- """ detect database and return VERION data structure or string (verbose=True)
- """
- query = 'SHOW VARIABLES LIKE "%version%"'
- rows = self.select_all(query)
- if verbose:
- result = []
- for row in rows:
- result.append("\t%s: %s"% (row['Variable_name'], row['Value']))
- result = "\n".join(result)
- else:
- result = rows
- return result
-
- def parse_db_connection_string(self, str):
- """ Parsing SQL DB connection string. String should contain:
- - DB Name, user name, password, URL (DB host), name
- Function should return tuple with parsed (host, user, passwd, db) or None if error
- E.g. connection string: 'mysql://username:password@127.0.0.1/db_name'
- """
- result = BaseDBAccess().parse_db_connection_string(str)
- if result is not None:
- (db_type, username, password, host, db_name) = result
- if db_type != 'mysql':
- result = None
- return result
-
- def is_connected(self):
- """ Returns True if we are connected to database
- """
- return self.db_object is not None
-
- def connect(self, host, user, passwd, db):
- """ Connects to DB and returns DB object
- """
- try:
- self.db_object = mdb.connect(host=host, user=user, passwd=passwd, db=db)
- # Let's remember connection credentials
- self.db_type = self.DB_TYPE
- self.host = host
- self.user = user
- self.passwd = passwd
- self.db = db
- except mdb.Error, e:
- print "Error %d: %s"% (e.args[0], e.args[1])
- self.db_object = None
- self.db_type = None
- self.host = None
- self.user = None
- self.passwd = None
- self.db = None
-
- def connect_url(self, db_url):
- """ Connects to database using db_url (database url parsing),
- store host, username, password, db_name
- """
- result = self.parse_db_connection_string(db_url)
- if result is not None:
- (db_type, username, password, host, db_name) = result
- if db_type == self.DB_TYPE:
- self.connect(host, username, password, db_name)
-
- def reconnect(self):
- """ Reconnects to DB and returns DB object using stored host name,
- database name and credentials (user name and password)
- """
- self.connect(self.host, self.user, self.passwd, self.db)
-
- def disconnect(self):
- """ Close DB connection
- """
- if self.db_object:
- self.db_object.close()
- self.db_object = None
- self.db_type = None
-
- def escape_string(self, str):
- """ Escapes string so it can be put in SQL query between quotes
- """
- con = self.db_object
- result = con.escape_string(str)
- return result if result else ''
-
- def select_all(self, query):
- """ Execute SELECT query and get all results
- """
- con = self.db_object
- cur = con.cursor(mdb.cursors.DictCursor)
- cur.execute(query)
- rows = cur.fetchall()
- return rows
-
- def insert(self, query, commit=True):
- """ Execute INSERT query, define if you want to commit
- """
- con = self.db_object
- cur = con.cursor()
- cur.execute(query)
- if commit:
- con.commit()
- return cur.lastrowid
-
- def get_next_build_id(self, name, desc='', location='', type=None, status=None):
- """ Insert new build_id (DB unique build like ID number to send all test results)
- """
- if status is None:
- status = self.BUILD_ID_STATUS_STARTED
-
- if type is None:
- type = self.BUILD_ID_TYPE_TEST
-
- query = """INSERT INTO `%s` (%s_name, %s_desc, %s_location, %s_type_fk, %s_status_fk)
- VALUES ('%s', '%s', '%s', %d, %d)"""% (self.TABLE_BUILD_ID,
- self.TABLE_BUILD_ID,
- self.TABLE_BUILD_ID,
- self.TABLE_BUILD_ID,
- self.TABLE_BUILD_ID,
- self.TABLE_BUILD_ID,
- self.escape_string(name),
- self.escape_string(desc),
- self.escape_string(location),
- type,
- status)
- index = self.insert(query) # Provide inserted record PK
- return index
-
- def get_table_entry_pk(self, table, column, value, update_db=True):
- """ Checks for entries in tables with two columns (<TABLE_NAME>_pk, <column>)
- If update_db is True updates table entry if value in specified column doesn't exist
- """
- # TODO: table buffering
- result = None
- table_pk = '%s_pk'% table
- query = """SELECT `%s`
- FROM `%s`
- WHERE `%s`='%s'"""% (table_pk,
- table,
- column,
- self.escape_string(value))
- rows = self.select_all(query)
- if len(rows) == 1:
- result = rows[0][table_pk]
- elif len(rows) == 0 and update_db:
- # Update DB with new value
- result = self.update_table_entry(table, column, value)
- return result
-
- def update_table_entry(self, table, column, value):
- """ Updates table entry if value in specified column doesn't exist
- Locks table to perform atomic read + update
- """
- result = None
- con = self.db_object
- cur = con.cursor()
- cur.execute("LOCK TABLES `%s` WRITE"% table)
- table_pk = '%s_pk'% table
- query = """SELECT `%s`
- FROM `%s`
- WHERE `%s`='%s'"""% (table_pk,
- table,
- column,
- self.escape_string(value))
- cur.execute(query)
- rows = cur.fetchall()
- if len(rows) == 0:
- query = """INSERT INTO `%s` (%s)
- VALUES ('%s')"""% (table,
- column,
- self.escape_string(value))
- cur.execute(query)
- result = cur.lastrowid
- con.commit()
- cur.execute("UNLOCK TABLES")
- return result
-
- def update_build_id_info(self, build_id, **kw):
- """ Update additional data inside build_id table
- Examples:
- db.update_build_id_info(build_id, _status_fk=self.BUILD_ID_STATUS_COMPLETED, _shuffle_seed=0.0123456789):
- """
- if len(kw):
- con = self.db_object
- cur = con.cursor()
- # Prepare UPDATE query
- # ["`mtest_build_id_pk`=[value-1]", "`mtest_build_id_name`=[value-2]", "`mtest_build_id_desc`=[value-3]"]
- set_list = []
- for col_sufix in kw:
- assign_str = "`%s%s`='%s'"% (self.TABLE_BUILD_ID, col_sufix, self.escape_string(str(kw[col_sufix])))
- set_list.append(assign_str)
- set_str = ', '.join(set_list)
- query = """UPDATE `%s`
- SET %s
- WHERE `mtest_build_id_pk`=%d"""% (self.TABLE_BUILD_ID,
- set_str,
- build_id)
- cur.execute(query)
- con.commit()
-
- def insert_test_entry(self, build_id, target, toolchain, test_type, test_id, test_result, test_output, test_time, test_timeout, test_loop, test_extra=''):
- """ Inserts test result entry to database. All checks regarding existing
- toolchain names in DB are performed.
- If some data is missing DB will be updated
- """
- # Get all table FK and if entry is new try to insert new value
- target_fk = self.get_table_entry_pk(self.TABLE_TARGET, self.TABLE_TARGET + '_name', target)
- toolchain_fk = self.get_table_entry_pk(self.TABLE_TOOLCHAIN, self.TABLE_TOOLCHAIN + '_name', toolchain)
- test_type_fk = self.get_table_entry_pk(self.TABLE_TEST_TYPE, self.TABLE_TEST_TYPE + '_name', test_type)
- test_id_fk = self.get_table_entry_pk(self.TABLE_TEST_ID, self.TABLE_TEST_ID + '_name', test_id)
- test_result_fk = self.get_table_entry_pk(self.TABLE_TEST_RESULT, self.TABLE_TEST_RESULT + '_name', test_result)
-
- con = self.db_object
- cur = con.cursor()
-
- query = """ INSERT INTO `%s` (`mtest_build_id_fk`,
- `mtest_target_fk`,
- `mtest_toolchain_fk`,
- `mtest_test_type_fk`,
- `mtest_test_id_fk`,
- `mtest_test_result_fk`,
- `mtest_test_output`,
- `mtest_test_time`,
- `mtest_test_timeout`,
- `mtest_test_loop_no`,
- `mtest_test_result_extra`)
- VALUES (%d, %d, %d, %d, %d, %d, '%s', %.2f, %.2f, %d, '%s')"""% (self.TABLE_TEST_ENTRY,
- build_id,
- target_fk,
- toolchain_fk,
- test_type_fk,
- test_id_fk,
- test_result_fk,
- self.escape_string(test_output),
- test_time,
- test_timeout,
- test_loop,
- self.escape_string(test_extra))
- cur.execute(query)
- con.commit()