Source code for nzpyida.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#-----------------------------------------------------------------------------
# Copyright (c) 2015-2023, IBM Corp.
# All rights reserved.
#
# Distributed under the terms of the BSD Simplified License.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------

# For local installation do "pip install -e" in the ibmdbpy-master directory
###############################################################################

"""
An IdaDataBase instance represents a reference to a remote Netezza Warehouse database
maintaining attributes and methods for administration of the database.
"""
import os
#from os import path
import sys
import math
import random
from time import time
import datetime
import warnings
from copy import deepcopy

from collections import OrderedDict


import numpy as np
import pandas as pd
from pandas.io.sql import read_sql

from lazy import lazy
import six

import nzpyida
from nzpyida import sql
from nzpyida.utils import timed, set_verbose, set_autocommit
from nzpyida.exceptions import IdaDataBaseError, PrimaryKeyError

[docs] class IdaDataBase(object): """ An IdaDataBase instance represents a reference to a remote Netezza Warehouse database. This is an abstraction layer for the remote connection. The IdaDataBase interface provides several functions that enable basic database administration in pythonic syntax. You can use either ODBC or JDBC or NZPY to connect to the database. The default connection type is ODBC, which is the standard connection type for Windows users. To establish an ODBC connection, download an IBM Netezza driver and set up your ODBC connection by specifying your connection protocol, port, and hostname. An ODBC connection on Linux or Mac might require more settings. For more information about how to establish an ODBC connection, see the pypyodbc documentation. To connect with JDBC, install the optional external package jaydebeapi, download the ibm jdbc driver, and save it in your local nzpyida folder. If you put the jdbc driver in the CLASSPATH variable or the folder that contains it, it will work too. A C++ compiler adapted to the current python version, operating system, and architecture may also be required to install jaydebeapi. To connect with NZPY, specify the required parameters in dict format, there is no need to install any drivers. The instantiation of an IdaDataBase object is a mandatory step before creating IdaDataFrame objects because IdaDataFrames require an IdaDataBase as a parameter to be initialized. By convention, use only one instance of IdaDataBase per database. However, you can use several instances of IdaDataFrame per connection. """
[docs] def __init__(self, dsn, uid='', pwd='', autocommit=True, verbose=False): """ Open a database connection. Parameters ---------- dsn : str Data Source Name (as specified in your ODBC settings) or JDBC URL string or NZPY dict uid : str, optional User ID. pwd : str, optional User password. autocommit : bool, default: True If True, automatically commits all operations. verbose : bool, defaukt: True If True, prints all SQL requests that are sent to the database. Attributes ---------- data_source_name : str Name of the referring DataBase. _con_type : str Type of the connection, either 'odbc' or 'jdbc' or 'nzpy'. _connection_string : str Connection string use for connecting via ODBC or JDBC or NZPY. _con : connection object Connection object to the remote Database. _database_system: str Underlying database system, either 'db2' or 'netezza' _database_name: str The name of the database the application is connected to. _idadfs : list List of IdaDataFrame objects opened under this connection. Returns ------- IdaDataBase object Raises ------ ImportError JayDeBeApi is not installed. IdaDataBaseError * uid and pwd are defined both in uid, pwd parameters and dsn. * The 'db2jcc4.jar' file is not in the ibmdbpy site-package repository. Examples -------- ODBC connection, userID and password are stored in ODBC settings: >>> IdaDataBase(dsn="BLUDB") # ODBC Connection <ibmdbpy.base.IdaDataBase at 0x9bec860> ODBC connection, userID and password are not stored in ODBC settings: >>> IdaDataBase(dsn="BLUDB", uid="<UID>", pwd="<PWD>") <ibmdbpy.base.IdaDataBase at 0x9bec860> JDBC connection, full JDBC string: >>> jdbc='jdbc:db2://<HOST>:<PORT>/<DBNAME>:user=<UID>;password=<PWD>' >>> IdaDataBase(dsn=jdbc) <ibmdbpy.base.IdaDataBase at 0x9bec860> JDBC connectiom, JDBC string and seperate userID and password: >>> jdbc = 'jdbc:db2://<HOST>:<PORT>/<DBNAME>' >>> IdaDataBase(dsn=jdbc, uid="<UID>", pwd="<PWD>") <ibmdbpy.base.IdaDataBase at 0x9bec860> NZPY connection, userID and password are stored in NZPY dict: nzpy_dsn ={ "database":"<DBNAME>", "port" :<PORT>, "host" : "<HOST>", "securityLevel":<SECURITYLEVEL>, "logLevel":<LOGLEVEL>, "user":"<UID>", "password":"<PWD>" } >>> IdaDataBase(dsn=nzpy_dsn) <ibmdbpy.base.IdaDataBase at 0x9bec860> NZPY connection, NZPY dict and seperate userID and password nzpy_dsn ={ "database":"<DBNAME>", "port" :<PORT>, "host" : "<HOST>", "securityLevel":<SECURITYLEVEL>, "logLevel":<LOGLEVEL> } >>> IdaDataBase(dsn=nzpy_dsn, uid="<UID>", pwd="<PWD>") """ if isinstance(dsn, dict)==False: for arg,name in zip([dsn, uid, pwd],['dsn','uid','pwd']): if not isinstance(arg, six.string_types): raise TypeError("Argument '%s' of type %, expected : string type."%(name,type(arg))) self.data_source_name = dsn # default value for _database_system is db2 self._database_system = 'db2' # first delimiter before the parameters in the jdbc-url url_1stparam_del = ':' # Detect if user attempt to connection with ODBC or JDBC if isinstance(dsn,str): if dsn.startswith('jdbc:'): self._con_type = "jdbc" if dsn.startswith('jdbc:netezza:'): self._database_system = 'netezza' # for Netezza the internal connection is always in autocommit mode # to allow explicit commits if not autocommit: dsn += ';autocommit=false' url_1stparam_del = ';' elif dsn.startswith('jdbc:db2:'): self._database_system = 'db2' else: raise IdaDataBaseError(("The JDBC connection string is invalid for Db2 and Netezza. " + "It has to start either with 'jdbc:db2:' or 'jdbc:netezza:'.")) else: self._con_type='odbc' elif isinstance(dsn, dict): self._con_type = "nzpy" self._idadfs = [] if self._con_type == 'nzpy': #parse dict host='' port=0 database='' securityLevel=0 logLevel=0 user ='' password='' for key,value in dsn.items(): if(key=='host'): host=value if (key=='port'): port=value if (key=='database'): database=value if (key=='securityLevel') : securityLevel=value if (key=='logLevel') : logLevel=value if (key == 'user'): user = value if (key == 'password'): password = value import nzpy; missingCredentialsMsg = ("Missing credentials to connect via NZPY.") ambiguousDefinitionMsg = ("Ambiguous definition of userID or password: " + "Cannot be defined in uid and pwd parameters " + "and in nzpy dict at the same time.") uidSpecified = uid != '' pwdSpecified = pwd != '' if not ('user' in dsn): if (uidSpecified): user= uid elif (uidSpecified): raise IdaDataBaseError(ambiguousDefinitionMsg) uidSpecified = uidSpecified | ('user' in dsn) if not ('password' in dsn): if (pwdSpecified): password= pwd elif (pwdSpecified): raise IdaDataBaseError(ambiguousDefinitionMsg) pwdSpecified = pwdSpecified | ('password' in dsn) # throw an exception if either uid or pwd are specified, # i.e. not both or none of the two if (uidSpecified ^ pwdSpecified): raise IdaDataBaseError(missingCredentialsMsg) try: if port <= 0: port = 5480 self._con = nzpy.connect(user=user, password=password,host=host, port=port, database=database, securityLevel=securityLevel,logLevel=logLevel) except Exception as e: raise IdaDataBaseError(str(e)) self._connection_string ={'user':user,'password':password,'host':host, 'port':port, 'database':database, 'securityLevel':securityLevel,'logLevel':logLevel} self._database_system = 'netezza' self._con.autocommit = autocommit if self._con_type == 'odbc' : self._connection_string = "DSN=%s; UID=%s; PWD=%s;LONGDATACOMPAT=1;"%(dsn,uid,pwd) """ Workaround for CLOB retrieval: Set the CLI/ODBC LongDataCompat keyword to 1. Doing so will force the CLI driver to make the following data type mappings SQL_CLOB to SQL_LONGVARCHAR SQL_BLOB to SQL_LONGVARBINARY SQL_DBCLOB to SQL_WLONGVARCHAR """ import pyodbc try: self._con = pyodbc.connect(self._connection_string, autocommit=autocommit) self._con.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8') self._con.setdecoding(pyodbc.SQL_WCHAR, encoding='utf-8') self._con.setencoding(encoding='utf-8') except Exception as e: raise IdaDataBaseError(str(e)) try: self.ida_query("select count(*) from _V_OBJECT") self._database_system = 'netezza' except Exception as e1: try: self.ida_query("select CURRENT_SERVER from SYSIBM.SYSDUMMY1") self._database_system = 'db2' except Exception as e2: errorMsg = ("The following errors occurred when trying to determine " + "if the database system is Netezza or Db2:\n" + "%s\n%s") % (str(e1), str(e2)) raise IdaDataBaseError(errorMsg) if self._con_type == 'jdbc': missingCredentialsMsg = ("Missing credentials to connect via JDBC.") ambiguousDefinitionMsg = ("Ambiguous definition of userID or password: " + "Cannot be defined in uid and pwd parameters " + "and in jdbc_url_string at the same time.") # remove trailing ":" or ";" or spaces on dsn, we will replace. dsn = dsn.rstrip(';: ') # find parameters on dsn; if any exist, there will be an equals sign. ix = dsn.find("=") # if no parameters exist, then this is the complete dsn if (ix < 0): # nothing needs to be done, if uid and pwd are missing # (uid and pwd are not needed for local database connections) # otherwise both uid and pwd have to be specified if uid and pwd: dsn = dsn + url_1stparam_del + 'user={};password={};'.format(uid, pwd) elif not uid and not pwd: # Neither UID nor PWD have to be specified for local connections. # This assumes that if they're missing, the connection is local. pass else: raise IdaDataBaseError(missingCredentialsMsg) else: # if we know there is at least one parameter; we can assume there exists a ":" before the parameter # portion of the string in a correctly formatted dsn. Therefore, just check for the existence of # the uid and pwd and add if they are missing. If they are on the string, IGNORE the string as-is. uidSpecified = uid != '' pwdSpecified = pwd != '' if not ('user=' in dsn): if (uidSpecified): dsn = dsn + ';user=' + uid elif (uidSpecified): raise IdaDataBaseError(ambiguousDefinitionMsg) uidSpecified = uidSpecified | ('user=' in dsn) if not ('password=' in dsn): if (pwdSpecified): dsn = dsn + ';password=' + pwd elif (pwdSpecified): raise IdaDataBaseError(ambiguousDefinitionMsg) pwdSpecified = pwdSpecified | ('password=' in dsn) # throw an exception if either uid or pwd are specified, # i.e. not both or none of the two if (uidSpecified^pwdSpecified): raise IdaDataBaseError(missingCredentialsMsg) # add trailing ";" to dsn. dsn = dsn + ';' jdbc_url = dsn try: import jaydebeapi import jpype except ImportError: raise ImportError("Please install optional dependency jaydebeapi "+ "to work with JDBC.") # check versions if jaydebeapi.__version__.startswith('0'): # Older JaydeBeAPi versions where the connection information is specified through a list # are not supported anymore. # The connection information has to be included now in a single connection string. message = ("Your JayDeBeApi module is not supported anymore. Please install version 1.x or higher.") raise IdaDataBaseError(message) if jpype.__version__ != '0.6.3': message = ("Your JPype1 version is not compatible with JayDeBeApi. Please install version 0.6.3.") raise IdaDataBaseError(message) here = os.path.abspath(os.path.dirname(__file__)) driver_not_found = "" if self._is_netezza_system(): driverlibs = ["nzjdbc3.jar"] else: driverlibs = ["db2jcc4.jar", "db2jcc.jar"] if (not jpype.isJVMStarted()): classpath = os.getenv('CLASSPATH','') jarpath = '' platform = sys.platform if verbose: print("Trying to find a path to the JDBC driver jar file in CLASSPATH (%s)."%classpath) if platform == 'win32': classpaths = classpath.split(';') else: classpaths = classpath.split(':') if any(dl for dl in driverlibs if dl in classpath): # A path to the driver exists in the classpath variable jarpaths = [jp for jp in classpaths if any([dl in jp for dl in driverlibs])] jarpath = "" while jarpaths: # check if a least one path exists jarpath = jarpaths.pop(0) # just take the first if os.path.isfile(jarpath): # Is the path correct? if platform == 'win32': jarpath = jarpath.split(':')[1].replace('\\', '/') # get rid of windows style formatting break else: if verbose: print("The path %s does not seem to be correct.\nTrying to recover..."%jarpath) jarpath = "" if not jarpath: # Means the search for a direct path in the CLASSPATH was not successful def _get_jdbc_driver_from_folders(folders): jarpath = '' if platform == 'win32': jarpaths = [fld.split(':')[1].replace('\\', '/') + "/" + dl for fld in folders for dl in driverlibs if os.path.isfile(fld + "/" + dl)] if jarpaths == []: # check if classpath contains paths with wildcard "*" at the end jarpaths = [fld.split(':')[1].replace('\\', '/')[:-1] + dl for fld in folders for dl in driverlibs if fld[-1:] == '*' and os.path.isfile(fld + "/" + dl)] else: jarpaths = [fld + "/" + dl for fld in folders for dl in driverlibs if os.path.isfile(fld + "/" + dl)] if jarpaths == []: # check if classpath contains paths with wildcard "*" at the end jarpaths = [fld[:-1] + dl for fld in folders for dl in driverlibs if fld[-1:] == '*' and os.path.isfile(fld[:-1] + dl)] if jarpaths: return jarpaths[0] if classpath: # There is at least something in the classpath variable # Let us see if the jar in a folder of the classpath if verbose: print("Trying to find the JDBC driver in the folders of CLASSPATH (%s)."%classpath) jarpath = _get_jdbc_driver_from_folders(classpaths) if not jarpath: # jarpath is still '' if verbose: print("Trying to find the JDBC driver in the local folder of ibmdbpy (%s)"%here) # Try to get the path to the driver from the ibmdbpy folder, the last chance jarpath = _get_jdbc_driver_from_folders([here]) if jarpath: if verbose: print("Found it at %s!\nTrying to connect..."%jarpath) jpype.startJVM(jpype.getDefaultJVMPath(), '-Djava.class.path=%s' % jarpath) if self._is_netezza_system(): driver_not_found = ("HELP: The Netezza JDBC driver library 'nzjdbc3.jar' could not be found. "+ "Please, follow the instructions on "+ "'https://www.ibm.com/support/knowledgecenter/en/SS5FPD_1.0.0/com.ibm.ips.doc/postgresql/odbc/c_datacon_plg_overview.html' "+ "for downloading and installing the JDBC driver "+ "and put the file 'nzjdbc3.jar' in the CLASSPATH variable "+ "or in a folder that is in the CLASSPATH variable. Alternatively place "+ "it in the folder '%s'."%here) else: driver_not_found = ("HELP: The JDBC driver for IBM Db2 could "+ "not be found. Please download the latest JDBC Driver at the "+ "following address: 'https://www.ibm.com/support/pages/node/382667' "+ "and put the file 'db2jcc.jar' or 'db2jcc4.jar' in the CLASSPATH variable "+ "or in a folder that is in the CLASSPATH variable. Alternatively place "+ "it in the folder '%s'."%here) self._connection_string = jdbc_url try: if self._is_netezza_system(): self._con = jaydebeapi.connect('org.netezza.Driver', self._connection_string) else: self._con = jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver', self._connection_string) except Exception as e: print(driver_not_found) raise IdaDataBaseError(e) if verbose: print("Connection successful!") #add DB2GSE to the database FUNCTION PATH #query = "SET CURRENT FUNCTION PATH = CURRENT FUNCTION PATH, db2gse" #self.ida_query(query) #not anymore, reported problems with ODBC #better mention DB2GSE explicitly when accessing its functions # determine name of database if self._is_netezza_system(): self._database_name = self.ida_scalar_query('select OBJNAME from _T_OBJECT where OBJID = CURRENT_DB;') self._upper_cased = self.ida_scalar_query('select identifier_case;') == 'UPPERCASE' else: self._database_name = self.ida_scalar_query('select CURRENT_SERVER from SYSIBM.SYSDUMMY1') # Setting Autocommit and verbose environment variables set_autocommit(autocommit) set_verbose(verbose)
########################################################################### #### Data Exploration ###########################################################################
[docs] @lazy def current_schema(self): """ Get the current user schema name as a string. Returns ------- str User's schema name. Examples -------- >>> idadb.current_schema() 'DASHXXXXXX' """ if self._is_netezza_system(): query = 'select TRIM(CURRENT_SCHEMA)' else: query = "SELECT TRIM(CURRENT_SCHEMA) FROM SYSIBM.SYSDUMMY1" return self.ida_scalar_query(query)
[docs] def show_tables(self, show_all=False): """ Show tables and views that are available in self. By default, this function shows only tables that belong to a user’s specific schema. Parameters ---------- show_all : bool If True, all table and view names in the database are returned, not only those that belong to the user's schema. Returns ------- DataFrame A data frame containing tables and views names in self with some additional information (TABSCHEMA, TABNAME, OWNER, TYPE). Examples -------- >>> ida_db.show_tables() TABSCHEMA TABNAME OWNER TYPE 0 DASHXXXXXX SWISS DASHXXXXXX T 1 DASHXXXXXX IRIS DASHXXXXXX T 2 DASHXXXXXX VIEW_TITANIC DASHXXXXXX V ... >>> ida_db.show_tables(show_all = True) TABSCHEMA TABNAME OWNER TYPE 0 DASHXXXXXX SWISS DASHXXXXXX T 1 DASHXXXXXX IRIS DASHXXXXXX T 2 DASHXXXXXX VIEW_TITANIC DASHXXXXXX V 2 SYSTOOLS IDAX_MODELS DASH101631 A ... Notes ----- show_tables implements a cache strategy. The cache is stored when the user calls the method with the argument show_all set to True. This improves performance because database table look ups are a very common operation. The cache gets updated each time a table or view is created or refreshed, each time a table or view is deleted, or when a new IdaDataFrame is opened. """ #### DEVELOPERS FIX: UNCOMMENT WHEN DEAD LOCK #show_all = False #### # Try to retrieve the cache if show_all and not self._is_netezza_system(): cache = self._retrieve_cache("cache_show_tables") if cache is not None: return cache where_part = "" if not show_all: where_part = ("AND TABSCHEMA = '%s' " % self.current_schema) if self._is_netezza_system(): query = ("SELECT SCHEMA as TABSCHEMA, OBJNAME as TABNAME, OWNER, " + "CASE WHEN UPPER(OBJTYPE) = 'TABLE' THEN 'T' ELSE 'V' END AS TYPE " + "FROM _V_OBJECTS " + "WHERE UPPER(OBJTYPE) in ('TABLE', 'VIEW') " + where_part + "ORDER BY TABSCHEMA, TABNAME") else: query = ("SELECT DISTINCT TABSCHEMA, TABNAME, OWNER, TYPE " + "FROM SYSCAT.TABLES " + "WHERE OWNERTYPE = 'U' " + where_part + "ORDER BY TABSCHEMA, TABNAME") data = self.ida_query(query) # Workaround for some ODBC version which does not get the entire # string of the column name in the cursor descriptor. # This is hardcoded, so be careful data.columns = ['TABSCHEMA', 'TABNAME', 'OWNER', 'TYPE'] #data = self._upper_columns(data) # Db2 Warehouse FIX: schema "SAMPLES" and "GOSALES" saved with an extra blank, # By doing so, we delete the extra blank. # Note that this works because all cells are of type string. # OLD version, created an unexpected bug in some wrong ODBC version # TypeError: unorderable types: bytes() > int() # less pythonic, do not use anymore #for col in data: # for index, val in enumerate(data[col]): # data[col][index] = val.strip() # Can be done with a one liner :) data = data.apply(lambda x: x.apply(lambda x: x.strip())) # Cache the result if show_all is True: self.cache_show_tables = data return data
[docs] def show_models(self): """ Show models that are available in the database. Returns ------- DataFrame Examples -------- >>> idadb.show_models() MODELSCHEMA MODELNAME OWNER 0 DASHXXXXXX KMEANS_10857_1434974511 DASHXXXXXX 1 DASHXXXXXX KMEANS_11726_1434977692 DASHXXXXXX 2 DASHXXXXXX KMEANS_11948_1434976568 DASHXXXXXX """ if self._is_netezza_system(): list_models_stmt = ("SELECT MODELNAME, OWNER, CREATED, STATE, MININGFUNCTION, ALGORITHM, USERCATEGORY " + "FROM INZA.V_NZA_MODELS") #result_columns = ['modelname', 'owner', 'created', 'state','miningfunction', 'algorithm', 'usercategory'] else: list_models_stmt = "call IDAX.LIST_MODELS()" result_columns = ['modelschema', 'modelname', 'owner', 'created', 'state', 'miningfunction', 'algorithm', 'usercategory'] data = self.ida_query(list_models_stmt) #data.columns = result_columns # Workaround for some ODBC version which does not get the entire # string of the column name in the cursor descriptor. # This is hardcoded, so be careful #data = self._upper_columns(data) return data
[docs] def exists_table_or_view(self, objectname): """ Check if a table or view exists in self. Parameters ---------- objectname : str Name of the table or view to check. Returns ------- bool Raises ------ TypeError The object exists but is not of the expected type. Examples -------- >>> idadb.exists_table_or_view("NOT_EXISTING") False >>> idadb.exists_table_or_view("TABLE_OR_VIEW") True >>> idadb.exists_table_or_view("NO_TABLE_NOR_VIEW") TypeError : "NO_TABLE_NOR_VIEW" exists in schema '?' but of type '?' """ return self._exists(objectname,['T', 'V'])
[docs] def exists_table(self, tablename): """ Check if a table exists in self. Parameters ---------- tablename : str Name of the table to check. Returns ------- bool Raises ------ TypeError The object exists but is not of the expected type. Examples -------- >>> idadb.exists_table("NOT_EXISTING") False >>> idadb.exists_table("TABLE") True >>> idadb.exists_table("NO_TABLE") TypeError : "tablename" exists in schema "?" but of type '?' """ return self._exists(tablename,['T'])
[docs] def exists_view(self, viewname): """ Check if a view exists in self. Parameters ---------- viewname : str Name of the view to check. Returns ------- bool Raises ------ TypeError The object exists but is not of the expected type. Examples -------- >>> idadb.exists_view("NOT_EXISTING") False >>> idadb.exists_view("VIEW") True >>> idadb.exists_view("NO_VIEW") TypeError : "viewname" exists in schema "?" but of type '?' """ return self._exists(viewname,['V'])
[docs] def exists_model(self, modelname): """ Check if a model exists in self. Parameters ---------- modelname : str Name of the model to check. It should contain only alphanumeric characters and underscores. All lower case characters will be converted to upper case characters. Returns ------- bool Raises ------ TypeError The object exists but is not of the expected type. Examples -------- >>> idadb.exists_model("MODEL") True >>> idadb.exists_model("NOT_EXISTING") False >>> idadb.exists_model("NO_MODEL") TypeError : NO_MODEL exists but is not a model (of type '?') """ modelname = nzpyida.utils.check_modelname(modelname, self._upper_cased) if '.' in modelname: modelschema, modelname = modelname.split('.') else: modelschema = self.current_schema if self._is_netezza_system(): # on Netezza the model schema part of the model name is ignored modelquery = "SELECT count(*) FROM INZA.V_NZA_MODELS WHERE MODELNAME ='%s'"%modelname modelexists = self.ida_scalar_query(modelquery) >= 1 if modelexists: return True else: # check if schema exists to avoid exception thrown by idax.list_models schemaquery = "SELECT count(*) FROM SYSCAT.SCHEMATA WHERE SCHEMANAME = '%s'"%modelschema schemaexists = self.ida_scalar_query(schemaquery) >= 1 if not schemaexists: return False data = self.ida_query("call idax.list_models('schema=%s, where=MODELNAME=''%s''')" % (modelschema, modelname)) if not data.empty: return True tablelist = self.show_tables(show_all=True) tablelist = tablelist[(tablelist['TABSCHEMA']==modelschema) & (tablelist['TABNAME']==modelname)] if len(tablelist): tabletype = tablelist['TYPE'].values[0] raise TypeError("%s.%s exists, but is not a model (of type '%s')" % (modelschema, modelname, tabletype)) else: return False
[docs] def is_table_or_view(self, objectname): """ Check if an object is a table or a view in self. Parameters ---------- objectname : str Name of the object to check. Returns ------- bool Raises ------ ValueError objectname doesn't exist in the database. Examples -------- >>> idadb.is_table_or_view("NO_TABLE") False >>> idadb.is_table_or_view("TABLE") True >>> idadb.is_table_or_view("NOT_EXISTING") ValueError : NO_EXISTING does not exist in database """ return self._is(objectname,['T','V'])
[docs] def is_table(self, tablename): """ Check if an object is a table in self. Parameters ---------- tablename : str Name of the table to check. Returns ------- bool Raises ------ ValueError The object doesn't exist in the database. Examples -------- >>> idadb.is_table("NO_TABLE") False >>> idadb.is_table("TABLE") True >>> idadb.is_table("NOT_EXISTING") ValueError : NO_EXISTING does not exist in database """ return self._is(tablename,['T'])
[docs] def is_view(self, viewname): """ Check if an object is a view in self. Parameters ---------- viewname : str Name of the view to check. Returns ------- bool Raises ------ ValueError The object doesn't exist in the database. Examples -------- >>> idadb.is_view("NO_VIEW") False >>> idadb.is_view("VIEW") True >>> idadb.is_view("NOT_EXISTING") ValueError : NO_EXISTING does not exist in database """ return self._is(viewname,['V'])
[docs] def is_model(self, modelname): """ Check if an object is a model in self. Parameters ---------- modelname : str Name of the model to check. It should contain only alphanumeric characters and underscores. All lower case characters will be converted to upper case characters. Returns ------- bool Raises ------ ValueError The object doesn't exist in the database. Examples -------- >>> idadb.is_model("MODEL") True >>> idadb.is_model("NO_MODEL") False >>> idadb.is_model("NOT_EXISTING") ValueError : NOT_EXISTING doesn't exist in database """ modelname = nzpyida.utils.check_modelname(modelname, self._upper_cased) if '.' in modelname: modelname_noschema = modelname.split('.')[-1] else: modelname_noschema = modelname data = self.show_models() if not data.empty: if modelname_noschema in data[self.to_def_case('MODELNAME')].values: return True # This part is executed if data is empty or model is not in data try: self.is_table_or_view(modelname) except: raise else: return False
[docs] def ida_query(self, query, silent=False, first_row_only=False, autocommit = False): """ Prepare, execute and format the result of a query in a dataframe or in a Tuple. If nothing is expected to be returned for the SQL command, nothing is returned. Parameters ---------- query : str Query to be executed. silent: bool, default: False If True, the query is not printed in the python console even if the verbosity mode is activated (VERBOSE environment variable is equal to “True”). first_row_only : bool, default: False If True, only the first row of the result is returned as a Tuple. autocommit: bool, default: False If True, the autocommit function is available. Returns ------- DataFrame or Tuple (if first_row_only=False) Examples -------- >>> idadb.ida_query("SELECT * FROM IRIS LIMIT 5") sepal_length sepal_width petal_length petal_width species 0 5.1 3.5 1.4 0.2 setosa 1 4.9 3.0 1.4 0.2 setosa 2 4.7 3.2 1.3 0.2 setosa 3 4.6 3.1 1.5 0.2 setosa 4 5.0 3.6 1.4 0.2 setosa >>> idadb.ida_query("SELECT COUNT(*) FROM IRIS") (150, 150, 150, 150) Notes ----- If first_row_only argument is True, then even if the actual result of the query is composed of several rows, only the first row will be returned. """ self._check_connection() return sql.ida_query(self, query, silent, first_row_only, autocommit)
[docs] def ida_scalar_query(self, query, silent=False, autocommit = False): """ Prepare and execute a query and return only the first element as a string. If nothing is returned from the SQL query, an error occurs. Parameters ---------- query : str Query to be executed. silent: bool, default: False If True, the query will not be printed in python console even if verbosity mode is activated. autocommit: bool, default: False If True, the autocommit function is available. Returns ------- str or Number Examples -------- >>> idadb.ida_scalar_query("SELECT TRIM(CURRENT_SCHEMA) from SYSIBM.SYSDUMMY1") 'DASHXXXXX' Notes ----- Even if the actual result of the query is composed of several columns and several rows, only the first element (top-left) will be returned. """ self._check_connection() return sql.ida_scalar_query(self, query, silent, autocommit)
############################################################################### #### Upload DataFrames ###############################################################################
[docs] @timed def as_idadataframe(self, dataframe, tablename=None, clear_existing=False, primary_key=None, indexer=None): """ Upload a dataframe and return its corresponding IdaDataFrame. The target table (tablename) will be created or replaced if the option clear_existing is set to True. To add data to an existing tables, see IdaDataBase.append Parameters ---------- dataframe : DataFrame Data to be uploaded, contained in a Pandas DataFrame. tablename : str, optional Name to be given to the table created in the database. It should contain only alphanumeric characters and underscores. All lower case characters will be converted to upper case characters. If not given, a valid tablename is generated (for example, DATA_FRAME_X where X is a random number). clear_existing : bool If set to True, a table will be replaced when a table with the same name already exists in the database. primary_key : str Name of a column to be used as primary key. Returns ------- IdaDataFrame Raises ------ TypeError * Argument dataframe is not of type pandas.DataFrame. * The primary key argument is not a string. NameError * The name already exists in the database and clear_existing is False. * The primary key argument doesn't correspond to a column. PrimaryKeyError The primary key contains non unique values. Examples -------- >>> from nzpyida.sampledata.iris import iris >>> idadb.as_idadataframe(iris, "IRIS") <ibmdbpy.frame.IdaDataFrame at 0xb34a898> >>> idadb.as_idadataframe(iris, "IRIS") NameError: IRIS already exists, choose a different name or use clear_existing option. >>> idadb.as_idadataframe(iris, "IRIS2") <ibmdbpy.frame.IdaDataFrame at 0xb375940> >>> idadb.as_idadataframe(iris, "IRIS", clear_existing = True) <ibmdbpy.frame.IdaDataFrame at 0xb371cf8> Notes ----- This function is not intended to be used to add data to an existing table, rather to create a new table from a dataframe. To add data to an existing table, please consider using IdaDataBase.append """ if not isinstance(dataframe, pd.DataFrame): raise TypeError("Argument dataframe is not of type Pandas.DataFrame") if tablename is None: tablename = self._get_valid_tablename(prefix="DATA_FRAME_") tablename = nzpyida.utils.check_tablename(tablename, self._upper_cased) if primary_key is not None: if not isinstance(primary_key, six.string_types): raise TypeError("The primary key argument should be a string") if primary_key not in dataframe.columns: raise ValueError("The primary key should be the name of a column" + " in the given dataframe") if len(dataframe[primary_key]) != len(set(dataframe[primary_key])): raise PrimaryKeyError(primary_key + " cannot be a primary key for" + "table " + tablename + " because it contains" + " non unique values") if self.exists_table_or_view(tablename): if clear_existing: try: self.drop_table(tablename) except: self.drop_view(tablename) else: raise NameError(("%s already exists, choose a different name "+ "or use clear_existing option.")%tablename) self._create_table(dataframe, tablename, primary_key=primary_key) idadf = nzpyida.frame.IdaDataFrame(self, tablename, indexer) self.append(idadf, dataframe) ############## Experimental ################## # dataframe.to_sql(tablename, self._con, index=False) # idadf = ibmdbpy.frame.IdaDataFrame(self, tablename, flavor='mysql', # schema = self.current_schema) self._autocommit() if primary_key: idadf._indexer=primary_key return idadf
########################################################################### #### Delete DataBase objects ###########################################################################
[docs] def drop_table(self, tablename): """ Drop a table in the database. Parameters ---------- tablename : str Name of the table to drop. Raises ------ ValueError If the object does not exist. TypeError If the object is not a table. Examples -------- >>> idadb.drop_table("TABLE") True >>> idadb.drop_table("NO_TABLE") TypeError : NO_TABLE exists in schema '?' but of type '?' >>> idadb.drop_table("NOT_EXISTING") ValueError : NO_EXISTING doesn't exist in database Notes ----- This operation cannot be undone if autocommit mode is activated. """ return self._drop(tablename, "T")
[docs] def drop_view(self, viewname): """ Drop a view in the database. Parameters ---------- viewname : str Name of the view to drop. Raises ------ ValueError If the object does not exist. TypeError If the object is not a view. Examples -------- >>> idadb.drop_view("VIEW") True >>> idadb.drop_view("NO_VIEW") TypeError : NO_VIEW exists in schema '?' but of type '?' >>> idadb.drop_view("NOT_EXISTING") ValueError : NO_EXISTING doesn't exist in database Notes ----- This operation cannot be undone if autocommit mode is activated. """ return self._drop(viewname, "V")
[docs] def drop_model(self, modelname): """ Drop a model in the database. Parameters ---------- modelname : str Name of the model to drop. Raises ------ ValueError If the object does not exist. TypeError if the object exists but is not a model. Examples -------- >>> idadb.drop_model("MODEL") True >>> idadb.drop_model("NO_MODEL") TypeError : NO_MODEL exists in schema '?' but of type '?' >>> idadb.drop_model("NOT_EXISTING") ValueError : NOT_EXISTING does not exist in database Notes ----- This operation cannot be undone if autocommit mode is activated. """ try: self._call_stored_procedure("DROP_MODEL ", model = modelname) except Exception as e: try: flag = self.exists_table(modelname) except TypeError: # Exists but is not a model (nor a table) raise else: if flag: # It is a table so make it raise by calling exists_view self.exists_view(modelname) value_error = ValueError(modelname + " does not exist in database") six.raise_from(value_error, e) else: tables = self.show_tables() if not tables.empty: for table in tables['TABNAME']: if modelname in table: self.drop_table(table) return True
[docs] @timed def rename(self, idadf, newname): """ Rename a table referenced by an IdaDataFrame in Db2 Warehouse. Parameters ---------- idadf : IdaDataFrame IdaDataFrame object referencing the table to rename. newname : str Name to be given to self. It should contain only alphanumeric characters and underscores. All lower case characters will be converted to upper case characters. The new name should not already exist in the database. Raises ------ ValueError The new tablename is not valid. TypeError Rename function is supported only for table type. NameError The name of the object to be created is identical to an existing name. Notes ----- Upper case characters and numbers, optionally separated by underscores “_”, are valid characters. """ # Actually we could support it for views too # Question : Is it better to accept an idadf as argument or rather the name of the table? oldname = idadf._name newname = nzpyida.utils.check_tablename(newname, self._upper_cased) if self.is_table(idadf._name): if self._is_netezza_system(): query = "ALTER TABLE %s RENAME TO %s"%(idadf._name, newname) else: query = "RENAME TABLE %s TO %s"%(idadf._name, newname) try: self._prepare_and_execute(query) except Exception as e: if self._con_type == "odbc": raise NameError(e.value[-1]) else: if self._is_netezza_system(): if "ERROR: relation does not exist" in str(e.args[0]): raise ValueError("Object does not exist.") elif 'ERROR: ALTER TABLE: object "%s" already exists'%newname in str(e.args[0]): raise NameError("The new name is identical to the old one") else: raise e else: sql_code = int(str(e.args[0]).split("SQLCODE=")[-1].split(",")[0]) if sql_code == -601: raise NameError("The new name is identical to the old one") else: raise e idadf._name = newname idadf.tablename = newname idadf.internal_state.name = newname self._reset_attributes("cache_show_tables") # Update name of all IdaDataFrame that were opened on this table for idadf in self._idadfs: if idadf._name == oldname: idadf._name = newname idadf.internal_state.name = newname # to refactor else: raise TypeError("Rename function is supported only for table type")
def add_column(self, idadf, column = None, colname = None, ncat=10): """ Add physically a column to the dataset. If column argument is set to None, a random column is generated which can take values for 1 to ncat. Used for benchmark purpose. Note: This method is deprecated. Can probably be removed without impact """ if column is not None: if column not in idadf.columns: raise ValueError("Unknown column %s"%column) dtype = idadf.dtypes['TYPENAME'][column] if dtype == "VARCHAR": dtype = dtype + "(255)" else: dtype = "INTEGER" if colname is None: i = 0 colname = "COL%s"%i while colname in idadf.columns: i += 1 colname = "COL%s"%i else: if colname in idadf.columns: raise ValueError("Column %s already exists"%colname) self.commit() query = "ALTER TABLE %s ADD \"%s\" %s"%(idadf._name, colname, dtype) print(query) try: self._prepare_and_execute(query) except: print("Failed ! Trying again in 5 seconds") #query2 = "REORG TABLE %s"%idadf._name #print(query2) import time time.sleep(5) self._prepare_and_execute(query) self.commit() # force sleep 2 sec if column is None: # Random column query = "UPDATE %s SET \"%s\" = RAND()* %s + 1"%(idadf._name, colname, ncat) else: query = "UPDATE %s SET \"%s\" = \"%s\""%(idadf._name, colname, column) print(query) try: self._prepare_and_execute(query) except: print("Failed ! Trying again in 5 seconds") import time time.sleep(5) self._prepare_and_execute(query) self.commit() idadf._reset_attributes(["get_columns", "dtypes", "shape", 'org_columns_names'])
[docs] @timed def add_column_id(self, idadf, column_id="ID", destructive=False): # TODO: Base the creation of the idea on the sorting of several columns # (or all columns in case there are duplicated rows) so that the ID # can be created in a determinstic and reproducible way """ Add an ID column to an IdaDataFrame. Parameters ---------- idadf : IdaDataFrame IdaDataFrame object to which an ID column will be added column_id : str Name of the ID column to add destructive : bool If set to True, the column will be added phisically in the database. This can take time. If set to False, the column will be added virtually in a view and a new IdaDataFrame is returned. Raises ------ TypeError idadf is not an IdaDataFrame. ValueError The given column name already exists in the DataBase. Notes ----- The non-destructive creation of column IDs is not reliable, because row IDs are recalculated on the fly in a non-deterministic way each time a new view is produced. On the contrary, creating them destructively i.e physically is reliable but can take time. If no sorting has been done whatsoever before, row IDs will be created at random. Improvement idea: create ID columns in a non-destructive way and base them on the sorting of a set of columns, defined by the user, or all columns if no column combination results in unique identifiers. """ if isinstance(idadf, nzpyida.IdaSeries): raise TypeError("Adding column ID is not supported for IdaSeries") if not isinstance(idadf, nzpyida.IdaDataFrame): raise TypeError("idadf is not an IdaDataFrame, type: %s"%type(idadf)) if column_id in idadf.columns: raise ValueError("A column named '"+column_id+"' already exists." + " Please define a new column name using"+ " column_id argument") if destructive is True: viewname = self._get_valid_tablename(prefix="VIEW_") if self._is_netezza_system(): order_by = "ORDER BY NULL" else: order_by = "" self._prepare_and_execute("CREATE VIEW " + viewname + " AS SELECT ((ROW_NUMBER() OVER("+ order_by +"))-1)" + " AS \"" + column_id + "\", \""+ "\",\"".join(idadf._get_all_columns_in_table()) + "\" FROM " + idadf._name) # Initiate the modified table under a random name tablename = self._get_valid_tablename(prefix="DATA_FRAME_") if self._is_netezza_system(): self._prepare_and_execute("CREATE TABLE %s AS (SELECT * FROM %s)"%(tablename,viewname)) else: self._prepare_and_execute("CREATE TABLE %s LIKE %s"%(tablename,viewname)) self._prepare_and_execute("INSERT INTO %s (SELECT * FROM %s)"%(tablename,viewname)) # Drop the view and old table self.drop_view(viewname) self.drop_table(idadf._name) # Give it the original name back self._reset_attributes("cache_show_tables") new_idadf = nzpyida.IdaDataFrame(self, tablename) self.rename(new_idadf, idadf.tablename) # Updating internal state # prepend the columndict OrderedDict items = idadf.internal_state.columndict.items() idadf.internal_state.columndict = OrderedDict() idadf.internal_state.columndict[column_id] = "\"" + column_id + "\"" for item in items: idadf.internal_state.columndict[item[0]] = item[1] idadf.internal_state.update() idadf._reset_attributes(["get_columns", "org_columns_names"]) idadf.indexer = column_id else: # prepend the columndict OrderedDict items = idadf.internal_state.columndict.items() idadf.internal_state.columndict = OrderedDict() if self._is_netezza_system(): order_by = "ORDER BY NULL" else: order_by = "" idadf.internal_state.columndict[column_id] = "((ROW_NUMBER() OVER("+ order_by +"))-1)" for item in items: idadf.internal_state.columndict[item[0]] = item[1] newColumndict = idadf.internal_state.columndict idadf.internal_state.update() idadf._reset_attributes(["get_columns"]) idadf.internal_state.columndict = newColumndict idadf.indexer = column_id # Reset attributes idadf._reset_attributes(['shape', 'axes', 'dtypes'])
[docs] def delete_column(self, idadf, column_name, destructive=False): """ Delete a column in an idaDataFrame. Parameters ---------- idadf : IdaDataFrame The IdaDataframe in which a column should be deleted. column_name : str Name of the column to delete. destructive : bool If set to True, the column is deleted in the database. Otherwise, it is deleted virtually, creating a view for the IdaDataFrame. Raises ------ TypeError column_name should be a string. ValueError column_name refers to a column that doesn't exist in self. """ if not isinstance(column_name, six.string_types): raise TypeError("column_name is not of string type") if column_name not in idadf.columns: raise ValueError("%s refers to a columns that doesn't exists in self"%(column_name)) if destructive is True: if column_name not in idadf._get_all_columns_in_table(): # Detect it is a virtual ID, the deletion cannot be destructive return self.delete_column(idadf, column_name, destructive=False) viewname = self._get_valid_tablename(prefix="VIEW_") columnlist = list(idadf._get_all_columns_in_table()) columnlist.remove(column_name) self._prepare_and_execute("CREATE VIEW " + viewname + " AS SELECT \""+ "\",\"".join(columnlist) + "\" FROM " + idadf._name) tablename = self._get_valid_tablename(prefix="DATA_FRAME_") if self._is_netezza_system(): self._prepare_and_execute("CREATE TABLE %s AS (SELECT * FROM %s)"%(tablename,viewname)) else: self._prepare_and_execute("CREATE TABLE " + tablename + " LIKE " + viewname) self._prepare_and_execute("INSERT INTO " + tablename + " (SELECT * FROM " + viewname + ")") # Drop the view and old table self.drop_view(viewname) self.drop_table(idadf._name) # Give it the original name back self._reset_attributes("cache_show_tables") # normally, no needed new_idadf = nzpyida.IdaDataFrame(self, tablename, idadf.indexer) self.rename(new_idadf, idadf.tablename) # updating internal state del idadf.internal_state.columndict[column_name] idadf.internal_state.update() self._reset_attributes("cache_show_tables") idadf._reset_attributes(['shape', 'get_columns', 'dtypes', 'org_columns_names']) else: del idadf.internal_state.columndict[column_name] newColumndict = idadf.internal_state.columndict idadf.internal_state.update() idadf._reset_attributes(["get_columns", "shape", "dtypes"]) idadf.internal_state.columndict = newColumndict if column_name == idadf.indexer: idadf._reset_attributes(["_indexer"])
[docs] def append(self, idadf, df, maxnrow=None): """ Append rows of a DataFrame to an IdaDataFrame. The DataFrame must have the same structure (same column names and datatypes). Optionally, the DataFrame to be added can be splitted into several chunks. This improves performance and prevents SQL overflows. By default, chunks are limited to 100.000 cells. Parameters ---------- idadf : IdaDataFrame IdaDataFrame that receives data from dataframe df. df : DataFrame Dataframe whose rows are added to IdaDataFrame idadf. maxnrow : int, optional number corresponding to the maximum number of rows for each chunks. Raises ------ TypeError * maxnrow should be an interger. * Argument idadf should be an IdaDataFrame. * Argument df should be a pandas DataFrame. ValueErrpr * maxnrow should be greater than 1 or nleft blank. * Other should be a Pandas DataFrame. * Other dataframe has not the same number of columns as self. * Some columns in other have different names that are different from the names of the columns in self. """ # SANITY CHECK : maxnrow if maxnrow is None: # Note : it has been measured on a big dataset (>1 million rows) that int(100000 / len(df.columns)) # performs better than the previous empirical value int(8000 / len(df.columns)) maxnrow = int(100000 / len(df.columns)) else: if not isinstance(maxnrow, six.integer_types): raise TypeError("maxnrow is not an integer") if maxnrow < 1: raise ValueError("maxnrow should be stricly positive or omitted") if maxnrow > 15000: warnings.warn("Performance may decrease if maxnrow is bigger than 15000", UserWarning) # SANITY CHECK : idadf & other if not isinstance(idadf, nzpyida.frame.IdaDataFrame): raise TypeError("Argument idadf is not an IdaDataFrame") if not isinstance(df, pd.DataFrame): raise TypeError("Argument df is not of type pPandas.DataFrame") if len(df.columns) != len(idadf.columns): raise ValueError("(Ida)DataFrames don't have the same number of columns") if any([column not in idadf.columns for column in [str(x).strip() for x in df.columns]]): raise ValueError("Some column names do not match current object columns: \n" + "Expected : \t" + str(idadf.columns) + "\n" + "Found : \t" + str([x.strip() for x in df.columns]) + "\n") if any([str(column_idadf) != str(column_other).strip() for column_idadf, column_other in zip(idadf.columns, df.columns)]): raise ValueError("Order or columns in other and" + idadf._name + "does not match.") if df.shape[0] > 1.5 * maxnrow: split_into = math.ceil(df.shape[0] / maxnrow) split = np.array_split(df, split_into) print("DataFrame will be splitted into " + str(split_into) + " chunks. (" + str(maxnrow) + " rows per chunk)") for i, chunk in enumerate(split, 0): percentage = int(i / split_into * 100) print("Uploaded: " + str(percentage) + "%... ", end="\r") try: self._insert_into_database(chunk, idadf.schema, idadf.tablename, silent=True) except: raise print("Uploaded: %s/%s... "%(split_into,split_into), end="") print("[DONE]") else: print("Uploading %s rows (maxnrow was set to %s)"%(df.shape[0], maxnrow)) try: self._insert_into_database(df, idadf.schema, idadf.tablename, silent=True) except: raise idadf._reset_attributes(['shape', 'axes', 'dtypes', 'index'])
############################################################################### #### Connection management ###############################################################################
[docs] def commit(self): """ Commit operations in the database. Notes ----- All changes that are made in the database after the last commit, including those in the child IdaDataFrames, are commited. If the environment variable ‘VERBOSE’ is set to True, the commit operations are notified in the console. """ self._check_connection() # Important self._con.commit() if os.getenv('VERBOSE') == 'True': print("<< COMMIT >>") self._reset_attributes("cache_show_tables")
[docs] def rollback(self): """ Rollback operations in the database. Notes ----- All changes that are made in the database after the last commit, including those in the child IdaDataFrames, are discarded. """ self._check_connection() # Important self._con.rollback() if os.getenv('VERBOSE') == 'True': print("<< ROLLBACK >>") self._reset_attributes("cache_show_tables")
[docs] def close(self): """ Close the IdaDataBase connection. Notes ----- If the environment variable ‘AUTOCOMMIT’ is set to True, then all changes after the last commit are committed, otherwise they are discarded. """ if os.getenv('AUTOCOMMIT') == 'True': self.commit() else: self.rollback() self._reset_attributes("cache_show_tables") self._con.close() print("Connection closed.")
[docs] def reconnect(self): """ Try to reopen the connection. """ try: self._check_connection() except IdaDataBaseError: if self._con_type == 'odbc': import pyodbc try: self._con = pyodbc.connect(self._connection_string) except: raise else: print("The connection was successfully restored") elif self._con_type == 'nzpy': import nzpy try: self._con = nzpy.connect(**(self._connection_string)) except: raise else: print("The connection was successfully restored") elif self._con_type == 'jdbc': try: import jaydebeapi if self._is_netezza_system(): self._con = jaydebeapi.connect('org.netezza.Driver', self._connection_string) else: self._con = jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver', self._connection_string) except: raise else: print("The connection was successfully restored") else: print("The connection for current IdaDataBase is valid")
############################################################################### #### Private methods ############################################################################### def __enter__(self): """ Allow the object to be used with a "with" statement """ return self def __exit__(self): """ Allow the object to be used with a "with" statement. Make sure the connection is closed when the object get out of scope """ self.close() def _exists(self, objectname, typelist): """ Check if an object of a certain type exists in Db2 Warehouse. Notes ----- For more information, see exists_table_or_view, exists_table, exists_view functions. """ objectname = nzpyida.utils.check_tablename(objectname, self._upper_cased) tablelist = self.show_tables(show_all=True) schema, name = self._get_name_and_schema(objectname) tablelist = tablelist[tablelist['TABSCHEMA'] == schema] if len(tablelist): if name in tablelist['TABNAME'].values: tabletype = tablelist[tablelist['TABNAME'] == name]['TYPE'].values[0] if tabletype in typelist: return True else: raise TypeError("%s exists in schema %s but of type \"%s\"" %(objectname,schema,tabletype)) else: return False else: return False def _is(self, objectname, typelist): """ Check if an existing object is of a certain type or in a list of types. Notes ----- For more information, see is_table_or_view, is_table, is_view functions. """ objectname = nzpyida.utils.check_tablename(objectname, self._upper_cased) tablelist = self.show_tables(show_all=True) schema, name = self._get_name_and_schema(objectname) tablelist = tablelist[tablelist['TABSCHEMA'] == schema] if len(tablelist): if name in tablelist['TABNAME'].values: tabletype = tablelist[tablelist['TABNAME'] == name]['TYPE'].values[0] if tabletype in typelist: return True else: return False raise ValueError("%s does not exist in database"%(objectname)) def _drop(self, objectname, object_type = "T"): """ Drop an object in the table depending on its type. Admissible type values are "T" (table) and "V" (view) Notes ----- For more information, seedrop_table and drop_view functions. """ objectname = nzpyida.utils.check_tablename(objectname, self._upper_cased) if object_type == "T": to_drop = "TABLE" elif object_type == "V": to_drop = "VIEW" else: raise ValueError("Unknown type to drop") try: self._prepare_and_execute("DROP %s %s"%(to_drop,objectname)) except Exception as e: if self._con_type == "odbc": if e.value[0] == "42S02": raise ValueError(e.value[1]) # does not exist if e.value[0] == "42809": raise TypeError(e.value[1]) # object is not of expected type else: if self._is_netezza_system(): if "ERROR: relation does not exist" in str(e.args[0]): raise ValueError("Object does not exist.") else: raise e else: sql_code = int(str(e.args[0]).split("SQLCODE=")[-1].split(",")[0]) if sql_code == -204: raise ValueError("Object does not exist.") elif sql_code == -159: raise TypeError("Object is not of expected type") else: raise e # let the expection raise anyway else: self._reset_attributes("cache_show_tables") return True def _upper_columns(self, dataframe): # Could be moved to utils (then move in the test too) """ Put every column name of a Pandas DataFrame in upper case. Parameters ---------- dataframe : DataFrame Returns ------- DataFrame """ data = deepcopy(dataframe) if len(data): data.columns = [x.upper() for x in data.columns] return data def _get_name_and_schema(self, objectname): """ Helper function that returns the name and the schema from an object name. Implicitly, if no schema name was given, it is assumed that user refers to the current schema. Parameters ---------- objectname : str Name of the object to process. Can be either under the form "SCHEMA.TABLE" or just "TABLE" Returns ------- tuple A tuple composed of 2 strings containing the schema and the name. Examples -------- >>> _get_name_and_schema(SCHEMA.TABLE) (SCHEMA, TABLE) >>> _get_name_and_schema(TABLE) (<current schema>, TABLE) """ if '.' in objectname: name = objectname.split('.')[-1] schema = objectname.split('.')[0] else: name = objectname schema = self.current_schema return (schema, name) def _get_valid_tablename(self, prefix="DATA_FRAME_"): """ Generate a valid database table name. Parameters ---------- prefix : str, default: "DATA_FRAME_" Prefix used to create the table name. The name is constructed using this pattern : <prefix>_X where <prefix> corresponds to the string parameter “prefix” capitalized and X corresponds to a pseudo randomly generated number (0-100000). Returns ------- str Examples -------- >>> idadb._get_valid_tablename() 'DATA_FRAME_49537_1434978215' >>> idadb._get_valid_tablename("MYDATA_") 'MYDATA_65312_1434978215' >>> idadb._get_valid_tablename("mydata_") 'MYDATA_78425_1434978215' >>> idadb._get_valid_tablename("mydata$") ValueError: Table name is not valid, only alphanumeric characters and underscores are allowed. """ prefix = nzpyida.utils.check_tablename(prefix, self._upper_cased) # We may assume that the generated name is unlikely to exist name = "%s%s_%s" % (prefix, random.randint(0, 100000), int(time())) return name def _get_valid_viewname(self, prefix="VIEW_"): """ Convenience function : Alternative name for _get_valid_tablename. The parameter prefix has its optional value changed to "VIEW\_"". Examples -------- >>> idadb._get_valid_viewname() 'VIEW_49537_1434978215' >>> idadb._get_valid_viewname("MYVIEW_") 'MYVIEW_65312_1434978215' >>> idadb._get_valid_viewname("myview_") 'MYVIEW_78425_1434978215' >>> idadb._get_valid_modelname("myview$") ValueError: View name is not valid, only alphanumeric characters and underscores are allowed. """ return self._get_valid_tablename(prefix) def _get_valid_modelname(self, prefix="MODEL_"): """ Convenience function : Alternative name for _get_valid_tablename. Parameter prefix has its optional value changed to "MODEL\_". Examples -------- >>> idadb._get_valid_modelname() 'MODEL_49537_1434978215' >>> idadb._get_valid_modelname("TEST_") 'TEST_65312_1434996318' >>> idadb._get_valid_modelname("test_") 'TEST_78425_1435632423' >>> idadb._get_valid_tablename("mymodel$") ValueError: Table name is not valid, only alphanumeric characters and underscores are allowed. """ return self._get_valid_tablename(prefix) def _create_table(self, dataframe, tablename, primary_key=None): """ Create a new table in the database by declaring its name and columns based on an existing DataFrame. It is possible declare a column as primary key. Parameters ---------- dataframe : DataFrame Pandas DataFrame be used to initiate the table. tablename : str Name to be given to the table at its creation. primary_key: str Name of a column to declare as primary key. Notes ----- The columns and their data type is deducted from the Pandas DataFrame given as parameter. Examples -------- >>> from nzpyida.sampledata.iris import iris >>> idadb._create_table(iris, "IRIS") 'IRIS' >>> idadb._create_table(iris) 'DATA_FRAME_4956' """ # TODO : Handle more types # integer_attributes = # ['int_', 'intc','BIGINT','REAL','DOUBLE','FLOAT','DECIMAL','NUMERIC'] # double_attributes = # ['SMALLINT', 'INTEGER','BIGINT','REAL','DOUBLE','FLOAT','DECIMAL','NUMERIC'] # self._check_connection() if not isinstance(dataframe, pd.DataFrame): raise TypeError("_create_table is valid only for DataFrame objects") if primary_key is not None: if not isinstance(primary_key, six.string_types): raise TypeError("primary_key argument should be a string") # Check the tablename tablename = nzpyida.utils.check_tablename(tablename, self._upper_cased) # for Netezza we have to check if the schema exists already # otherwise we have to create it if self._is_netezza_system() & ("." in tablename): schemaname, tabname = tablename.split(".") if self.ida_scalar_query("SELECT COUNT(*) FROM _V_SCHEMA WHERE SCHEMA='%s'"%schemaname) == 0: self.ida_query("CREATE SCHEMA " + schemaname) column_string = '' for column in dataframe.columns: if dataframe.dtypes[column] in [object,bool]: # Handle boolean type if set(dataframe[column].unique()).issubset([True, False, 0, 1, np.nan]): column_string += "\"%s\" SMALLINT," % str(column).strip() else: if column == primary_key: column_string += "\"%s\" VARCHAR(255) NOT NULL, PRIMARY KEY (\"%s\")," % (str(column).strip(), str(column).strip()) else: column_string += "\"%s\" VARCHAR(255)," % str(column).strip() elif dataframe.dtypes[column] == np.dtype('datetime64[ns]'): # This is a first patch for handling dates # TODO: Dates as timestamp in the database column_string += "\"%s\" VARCHAR(255)," % str(column).strip() else: if dataframe.dtypes[column] in [np.int64, int, np.int8, np.int32]: if abs(dataframe[column].max()) < 2147483647/2: # might get bigger datatype = "INT" else: datatype = "BIGINT" else: datatype = "DOUBLE" if column == primary_key: column_string += "\"%s\" %s NOT NULL, PRIMARY KEY (\"%s\")," % (str(column).strip(), datatype, str(column).strip()) else: column_string += "\"%s\" %s," %(str(column.strip()), datatype) if column_string[-1] == ',': column_string = column_string[:-1] if '.' in tablename: schema, tablename2 = tablename.split('.') else: schema = self.current_schema tablename2 = tablename create_table = "CREATE TABLE \"%s\".\"%s\" (%s)" % (schema, tablename2, column_string) self._prepare_and_execute(create_table, autocommit=False) # Save new table in cache if hasattr(self, "cache_show_tables"): record = (schema, tablename2, self.current_schema, 'T') self.cache_show_tables.loc[len(self.cache_show_tables)] = np.array(record) return "\"%s\".\"%s\"" % (schema, tablename2) def _create_view(self, idadf, viewname = None): """ Create a new view in the database from an existing table. Parameters ---------- idadf : IdaDataFrame IdaDataFrame to be duplicated as view. viewname : str, optional Name to be given to the view at its creation. If not given, a random name will be generated automatically Returns ------- str View name. Examples -------- >>> idadf = IdaDataFrame(idadb, "IRIS") >>> idadb._create_view(idadf) 'IDAR_VIEW_4956' """ if not isinstance(idadf, nzpyida.frame.IdaDataFrame): raise TypeError("_create_view is valid only for IdaDataFrame objects") # Check the viewname if viewname is not None: viewname = nzpyida.utils.check_viewname(viewname, self._upper_cased) else: viewname = self._get_valid_viewname() self._prepare_and_execute("CREATE VIEW \"%s\" AS SELECT * FROM %s" % (viewname, idadf._name)) # Save new view in cache if hasattr(self, "cache_show_tables"): record = (self.current_schema, viewname, self.current_schema, 'V') self.cache_show_tables.loc[len(self.cache_show_tables)] = np.array(record) return viewname def _create_view_from_expression(self, expression, viewname = None): """ Create a new view in the database from an expression. Parameters ---------- expression : str Expression that defines the view to be created viewname : str, optional Name to be given to the view at its creation. If not given, a random name will be generated automatically Returns ------- str View name. Examples -------- TODO """ if not isinstance(expression, six.string_types): raise TypeError("expression argument expected to be of string type") # Check the viewname if viewname is not None: viewname = nzpyida.utils.check_viewname(viewname, self._upper_cased) else: viewname = self._get_valid_viewname() self._prepare_and_execute("CREATE VIEW \"%s\" AS %s" % (viewname, expression)) # Save new view in cache if hasattr(self, "cache_show_tables"): record = (self.current_schema, viewname, self.current_schema, 'V') self.cache_show_tables.loc[len(self.cache_show_tables)] = np.array(record) return viewname def _insert_into_database(self, dataframe, schema, tablename, silent=True): """ Populate an existing table with data from a dataframe. Parameters ---------- dataframe: DataFrame Data to be inserted into an existing table, contained in a Pandas DataFrame. It is assumed that the structure matches. schema: str Schema of the table in which the data is inserted. tablename: str Name of the table in which the data is inserted. silent : bool, default: True If True, the INSERT statement is not printed. Avoids flooding the console. """ # TODO : Handle more datatypes if schema is None or schema.strip() == '': schema = self.current_schema tablename = nzpyida.utils.check_tablename(tablename, self._upper_cased) column_string = '\"%s\"' % '\", \"'.join([str(x).strip() for x in dataframe.columns]) row_string = '' # Save in a list columns that are booleans boolean_flaglist = [] for column in dataframe.columns: if set(dataframe[column].unique()).issubset([True, False, 0, 1, np.nan]): boolean_flaglist.append(1) else: boolean_flaglist.append(0) if self._is_netezza_system(): sepstr1 = 'SELECT ' sepstr2 = ' UNION ALL SELECT ' sepstr3 = ' ' else: sepstr1 = 'VALUES (' sepstr2 = '), (' sepstr3 = ') ' row_separator = sepstr1 for rows in dataframe.values: value_string = '' for colindex, value in enumerate(rows): if pd.isnull(value): # handles np.nan and None value_string += "NULL," # Handle missing values elif isinstance(value, six.string_types): ## Handle apostrophe in values value = value.replace("\\", "'") value_string += '\'%s\',' % value.replace("'", "''") # REMARK: it is the best way to handle booleans ? elif isinstance(value, bool): if boolean_flaglist[colindex] == True: if value in [1, True]: value_string += '1,' elif value in [0, False]: value_string += '0,' else: value_string += '\'%s\',' % value # TODO: Handle datetime better than strings elif isinstance(value, datetime.datetime): value_string += '\'%s\',' % value else: value_string += '%s,' % value if value_string[-1] == ',': value_string = value_string[:-1] row_string += (row_separator + " %s ") % value_string row_separator = sepstr2 row_string = row_string + sepstr3 if row_string[-2:] == '),': row_string = row_string[:-2] if row_string[0] == '(': row_string = row_string[1:] query = ("INSERT INTO \"%s\".\"%s\" (%s) (%s)" % (schema, tablename, column_string, row_string)) # print(query) # TODO: Good idea : create a savepoint before creating the table # Rollback in to savepoint in case of failure self._prepare_and_execute(query, autocommit=False, silent=silent) for idadf in self._idadfs: if idadf._name == tablename: idadf._reset_attributes(["shape", "index"]) def _prepare_and_execute(self, query, autocommit=True, silent=False): """ Prepare and execute a query by using the cursor of an idaobject. Parameters ---------- idaobject: IdaDataBase or IdaDataFrame query: str Query to be executed. autocommit: bool, default: True If True, the autocommit function is available. silent: bool, default: False If True, the SQL statement is not printed. """ self._check_connection() return sql._prepare_and_execute(self, query, autocommit, silent) def _check_procedure(self, proc_name, alg_name=None): """ Check if a procedure is available in the database. Parameters ---------- proc_name : str Name of the procedure to be checked as defined in the underlying database management system. alg_name : str Name of the algorithm, human readable. Returns ------- bool Examples -------- >>> idadb._check_procedure('KMEANS') True >>> idadb._check_procedure('NOT_EXISTING') IdaDatabaseError: Function 'NOT_EXISTING' is not available. """ if alg_name is None: alg_name = proc_name if self._is_netezza_system(): query = ("SELECT COUNT(*) FROM NZA.._V_PROCEDURE WHERE UPPER(PROCEDURE)='%s'") % proc_name else: query = ("SELECT COUNT(*) FROM SYSCAT.ROUTINES WHERE ROUTINENAME='%s" + "' AND ROUTINEMODULENAME = 'IDAX'") % proc_name flag = self.ida_scalar_query(query) if int(flag) == False: raise IdaDataBaseError("Function '%s' is not available." % alg_name) else: return True def _call_stored_procedure(self, sp_name, **kwargs): """ Call a specific IDAX/INZA stored procedure and return its result. Parameters ---------- sp_name : str Name of the stored procedure. **kwargs : ... Additional parameters, specific to the called stored procedure. """ tmp = [] views = [] if self._is_netezza_system(): sp_schema = 'NZA.' else: sp_schema = 'IDAX' for key, value in six.iteritems(kwargs): if value is None: continue # Go to next iteration if isinstance(value, nzpyida.frame.IdaDataFrame): tmp_view_name = self._create_view(value) tmp.append("%s=%s" % (key, tmp_view_name)) views.append(tmp_view_name) elif isinstance(value, six.string_types) and all([x != " " for x in value]): if key in ("intable", 'model', 'outtable', 'incolumn', 'nametable', 'colPropertiesTable'): tmp.append("%s=%s"% (key, value)) # no " in the case it is a table name else: tmp.append("%s=\"%s\"" % (key, value)) elif isinstance(value, list): tmp.append("%s=\"%s\"" % (key, " ".join(str(value)))) else: tmp.append("%s=%s" % (key, value)) try: call = "CALL %s.%s('%s')" % (sp_schema, sp_name, ",".join(tmp)) result = self._prepare_and_execute(call) except: if self._is_netezza_system(): error_msg = "Error" else: error_msg = self.ida_scalar_query("values idax.last_message") raise IdaDataBaseError(error_msg) finally: for view in views: self.drop_view(view) return result def _autocommit(self): """ Commit changes made to the database in the connection automatically. If the environment variable 'AUTOCOMMIT' is set to True, then commit. Notes ----- In the case of a commit operation, all changes that are made in the Database after the last commit, including those in the children IdaDataFrames, will be commited. If the environment variable 'VERBOSE' is not set to 'True', the autocommit operations will not be notified in the console to the user. """ if os.getenv('AUTOCOMMIT') == 'True': self._con.commit() if os.getenv('VERBOSE') == 'True': print("<< AUTOCOMMIT >>") def _check_connection(self): """ Check if the connection still exists by trying to open a cursor. """ if self._con_type == "odbc": try: self._con.cursor() except Exception as e: raise IdaDataBaseError(e.value[-1]) elif self._con_type == "nzpy": try: self._con.cursor().execute("select 1") except Exception as e: raise IdaDataBaseError(e) elif self._con_type == "jdbc": # This avoids infinite recursions on Netezza due to reconnect calls in # sql.ida_query, sql.ida_scalar_query and sql._prepare_and_execute if self._con._closed: raise IdaDataBaseError("The connection is closed") try: # Avoid infinite recursion if self._is_netezza_system(): # On Netezza no result sets are returned after the first database error if read_sql("SELECT OBJID FROM _V_TABLE LIMIT 1", self._con) is None: raise IdaDataBaseError("The connection is closed") else: read_sql("SELECT TABLEID FROM SYSCAT.TABLES LIMIT 1", self._con) except Exception as e: raise IdaDataBaseError("The connection is closed") def _retrieve_cache(self, cache): """ Helper function that retrieve cache if available. Cache are just string type values stored in private attributes. """ if not isinstance(cache, six.string_types): raise TypeError("cache is not of string type") self._check_connection() if hasattr(self, cache): return getattr(self,cache) else: return None def _reset_attributes(self, attributes): """ Helper function that delete attributes given as parameter if they exists in self. This is used to refresh lazy attributes and caches. """ nzpyida.utils._reset_attributes(self, attributes) def _is_netezza_system(self): """ Checks if the underlying database system is Netezza. """ return self._database_system == 'netezza' def to_def_case(self, text): """ Converts the given object to the default case on this database. The object can be a string or list of strings. """ if isinstance(text, str): return text.upper() if self._upper_cased else text.lower() elif isinstance(text, list): return [x.upper() if self._upper_cased else x.lower() for x in text] else: return text