Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/pandas/io/sql.py
7813 views
1
"""
2
Collection of query wrappers / abstractions to both facilitate data
3
retrieval and to reduce dependency on DB-specific API.
4
"""
5
6
from __future__ import annotations
7
8
from contextlib import contextmanager
9
from datetime import (
10
date,
11
datetime,
12
time,
13
)
14
from functools import partial
15
import re
16
from typing import (
17
Any,
18
Iterator,
19
Sequence,
20
cast,
21
overload,
22
)
23
import warnings
24
25
import numpy as np
26
27
import pandas._libs.lib as lib
28
from pandas._typing import DtypeArg
29
from pandas.compat._optional import import_optional_dependency
30
from pandas.errors import AbstractMethodError
31
from pandas.util._exceptions import find_stack_level
32
33
from pandas.core.dtypes.common import (
34
is_datetime64tz_dtype,
35
is_dict_like,
36
is_list_like,
37
)
38
from pandas.core.dtypes.dtypes import DatetimeTZDtype
39
from pandas.core.dtypes.missing import isna
40
41
from pandas import get_option
42
from pandas.core.api import (
43
DataFrame,
44
Series,
45
)
46
from pandas.core.base import PandasObject
47
import pandas.core.common as com
48
from pandas.core.tools.datetimes import to_datetime
49
from pandas.util.version import Version
50
51
52
class DatabaseError(OSError):
53
pass
54
55
56
# -----------------------------------------------------------------------------
57
# -- Helper functions
58
59
60
def _gt14() -> bool:
61
"""
62
Check if sqlalchemy.__version__ is at least 1.4.0, when several
63
deprecations were made.
64
"""
65
import sqlalchemy
66
67
return Version(sqlalchemy.__version__) >= Version("1.4.0")
68
69
70
def _convert_params(sql, params):
71
"""Convert SQL and params args to DBAPI2.0 compliant format."""
72
args = [sql]
73
if params is not None:
74
if hasattr(params, "keys"): # test if params is a mapping
75
args += [params]
76
else:
77
args += [list(params)]
78
return args
79
80
81
def _process_parse_dates_argument(parse_dates):
82
"""Process parse_dates argument for read_sql functions"""
83
# handle non-list entries for parse_dates gracefully
84
if parse_dates is True or parse_dates is None or parse_dates is False:
85
parse_dates = []
86
87
elif not hasattr(parse_dates, "__iter__"):
88
parse_dates = [parse_dates]
89
return parse_dates
90
91
92
def _handle_date_column(
93
col, utc: bool | None = None, format: str | dict[str, Any] | None = None
94
):
95
if isinstance(format, dict):
96
# GH35185 Allow custom error values in parse_dates argument of
97
# read_sql like functions.
98
# Format can take on custom to_datetime argument values such as
99
# {"errors": "coerce"} or {"dayfirst": True}
100
error = format.pop("errors", None) or "ignore"
101
return to_datetime(col, errors=error, **format)
102
else:
103
# Allow passing of formatting string for integers
104
# GH17855
105
if format is None and (
106
issubclass(col.dtype.type, np.floating)
107
or issubclass(col.dtype.type, np.integer)
108
):
109
format = "s"
110
if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]:
111
return to_datetime(col, errors="coerce", unit=format, utc=utc)
112
elif is_datetime64tz_dtype(col.dtype):
113
# coerce to UTC timezone
114
# GH11216
115
return to_datetime(col, utc=True)
116
else:
117
return to_datetime(col, errors="coerce", format=format, utc=utc)
118
119
120
def _parse_date_columns(data_frame, parse_dates):
121
"""
122
Force non-datetime columns to be read as such.
123
Supports both string formatted and integer timestamp columns.
124
"""
125
parse_dates = _process_parse_dates_argument(parse_dates)
126
127
# we want to coerce datetime64_tz dtypes for now to UTC
128
# we could in theory do a 'nice' conversion from a FixedOffset tz
129
# GH11216
130
for col_name, df_col in data_frame.items():
131
if is_datetime64tz_dtype(df_col.dtype) or col_name in parse_dates:
132
try:
133
fmt = parse_dates[col_name]
134
except TypeError:
135
fmt = None
136
data_frame[col_name] = _handle_date_column(df_col, format=fmt)
137
138
return data_frame
139
140
141
def _wrap_result(
142
data,
143
columns,
144
index_col=None,
145
coerce_float: bool = True,
146
parse_dates=None,
147
dtype: DtypeArg | None = None,
148
):
149
"""Wrap result set of query in a DataFrame."""
150
frame = DataFrame.from_records(data, columns=columns, coerce_float=coerce_float)
151
152
if dtype:
153
frame = frame.astype(dtype)
154
155
frame = _parse_date_columns(frame, parse_dates)
156
157
if index_col is not None:
158
frame.set_index(index_col, inplace=True)
159
160
return frame
161
162
163
def execute(sql, con, params=None):
164
"""
165
Execute the given SQL query using the provided connection object.
166
167
Parameters
168
----------
169
sql : string
170
SQL query to be executed.
171
con : SQLAlchemy connectable(engine/connection) or sqlite3 connection
172
Using SQLAlchemy makes it possible to use any DB supported by the
173
library.
174
If a DBAPI2 object, only sqlite3 is supported.
175
params : list or tuple, optional, default: None
176
List of parameters to pass to execute method.
177
178
Returns
179
-------
180
Results Iterable
181
"""
182
pandas_sql = pandasSQL_builder(con)
183
args = _convert_params(sql, params)
184
return pandas_sql.execute(*args)
185
186
187
# -----------------------------------------------------------------------------
188
# -- Read and write to DataFrames
189
190
191
@overload
192
def read_sql_table(
193
table_name,
194
con,
195
schema=...,
196
index_col=...,
197
coerce_float=...,
198
parse_dates=...,
199
columns=...,
200
chunksize: None = ...,
201
) -> DataFrame:
202
...
203
204
205
@overload
206
def read_sql_table(
207
table_name,
208
con,
209
schema=...,
210
index_col=...,
211
coerce_float=...,
212
parse_dates=...,
213
columns=...,
214
chunksize: int = ...,
215
) -> Iterator[DataFrame]:
216
...
217
218
219
def read_sql_table(
220
table_name: str,
221
con,
222
schema: str | None = None,
223
index_col: str | Sequence[str] | None = None,
224
coerce_float: bool = True,
225
parse_dates=None,
226
columns=None,
227
chunksize: int | None = None,
228
) -> DataFrame | Iterator[DataFrame]:
229
"""
230
Read SQL database table into a DataFrame.
231
232
Given a table name and a SQLAlchemy connectable, returns a DataFrame.
233
This function does not support DBAPI connections.
234
235
Parameters
236
----------
237
table_name : str
238
Name of SQL table in database.
239
con : SQLAlchemy connectable or str
240
A database URI could be provided as str.
241
SQLite DBAPI connection mode not supported.
242
schema : str, default None
243
Name of SQL schema in database to query (if database flavor
244
supports this). Uses default schema if None (default).
245
index_col : str or list of str, optional, default: None
246
Column(s) to set as index(MultiIndex).
247
coerce_float : bool, default True
248
Attempts to convert values of non-string, non-numeric objects (like
249
decimal.Decimal) to floating point. Can result in loss of Precision.
250
parse_dates : list or dict, default None
251
- List of column names to parse as dates.
252
- Dict of ``{column_name: format string}`` where format string is
253
strftime compatible in case of parsing string times or is one of
254
(D, s, ns, ms, us) in case of parsing integer timestamps.
255
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
256
to the keyword arguments of :func:`pandas.to_datetime`
257
Especially useful with databases without native Datetime support,
258
such as SQLite.
259
columns : list, default None
260
List of column names to select from SQL table.
261
chunksize : int, default None
262
If specified, returns an iterator where `chunksize` is the number of
263
rows to include in each chunk.
264
265
Returns
266
-------
267
DataFrame or Iterator[DataFrame]
268
A SQL table is returned as two-dimensional data structure with labeled
269
axes.
270
271
See Also
272
--------
273
read_sql_query : Read SQL query into a DataFrame.
274
read_sql : Read SQL query or database table into a DataFrame.
275
276
Notes
277
-----
278
Any datetime values with time zone information will be converted to UTC.
279
280
Examples
281
--------
282
>>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP
283
"""
284
pandas_sql = pandasSQL_builder(con, schema=schema)
285
if not pandas_sql.has_table(table_name):
286
raise ValueError(f"Table {table_name} not found")
287
288
table = pandas_sql.read_table(
289
table_name,
290
index_col=index_col,
291
coerce_float=coerce_float,
292
parse_dates=parse_dates,
293
columns=columns,
294
chunksize=chunksize,
295
)
296
297
if table is not None:
298
return table
299
else:
300
raise ValueError(f"Table {table_name} not found", con)
301
302
303
@overload
304
def read_sql_query(
305
sql,
306
con,
307
index_col=...,
308
coerce_float=...,
309
params=...,
310
parse_dates=...,
311
chunksize: None = ...,
312
dtype: DtypeArg | None = ...,
313
) -> DataFrame:
314
...
315
316
317
@overload
318
def read_sql_query(
319
sql,
320
con,
321
index_col=...,
322
coerce_float=...,
323
params=...,
324
parse_dates=...,
325
chunksize: int = ...,
326
dtype: DtypeArg | None = ...,
327
) -> Iterator[DataFrame]:
328
...
329
330
331
def read_sql_query(
332
sql,
333
con,
334
index_col=None,
335
coerce_float: bool = True,
336
params=None,
337
parse_dates=None,
338
chunksize: int | None = None,
339
dtype: DtypeArg | None = None,
340
) -> DataFrame | Iterator[DataFrame]:
341
"""
342
Read SQL query into a DataFrame.
343
344
Returns a DataFrame corresponding to the result set of the query
345
string. Optionally provide an `index_col` parameter to use one of the
346
columns as the index, otherwise default integer index will be used.
347
348
Parameters
349
----------
350
sql : str SQL query or SQLAlchemy Selectable (select or text object)
351
SQL query to be executed.
352
con : SQLAlchemy connectable, str, or sqlite3 connection
353
Using SQLAlchemy makes it possible to use any DB supported by that
354
library. If a DBAPI2 object, only sqlite3 is supported.
355
index_col : str or list of str, optional, default: None
356
Column(s) to set as index(MultiIndex).
357
coerce_float : bool, default True
358
Attempts to convert values of non-string, non-numeric objects (like
359
decimal.Decimal) to floating point. Useful for SQL result sets.
360
params : list, tuple or dict, optional, default: None
361
List of parameters to pass to execute method. The syntax used
362
to pass parameters is database driver dependent. Check your
363
database driver documentation for which of the five syntax styles,
364
described in PEP 249's paramstyle, is supported.
365
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
366
parse_dates : list or dict, default: None
367
- List of column names to parse as dates.
368
- Dict of ``{column_name: format string}`` where format string is
369
strftime compatible in case of parsing string times, or is one of
370
(D, s, ns, ms, us) in case of parsing integer timestamps.
371
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
372
to the keyword arguments of :func:`pandas.to_datetime`
373
Especially useful with databases without native Datetime support,
374
such as SQLite.
375
chunksize : int, default None
376
If specified, return an iterator where `chunksize` is the number of
377
rows to include in each chunk.
378
dtype : Type name or dict of columns
379
Data type for data or columns. E.g. np.float64 or
380
{‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}.
381
382
.. versionadded:: 1.3.0
383
384
Returns
385
-------
386
DataFrame or Iterator[DataFrame]
387
388
See Also
389
--------
390
read_sql_table : Read SQL database table into a DataFrame.
391
read_sql : Read SQL query or database table into a DataFrame.
392
393
Notes
394
-----
395
Any datetime values with time zone information parsed via the `parse_dates`
396
parameter will be converted to UTC.
397
"""
398
pandas_sql = pandasSQL_builder(con)
399
return pandas_sql.read_query(
400
sql,
401
index_col=index_col,
402
params=params,
403
coerce_float=coerce_float,
404
parse_dates=parse_dates,
405
chunksize=chunksize,
406
dtype=dtype,
407
)
408
409
410
@overload
411
def read_sql(
412
sql,
413
con,
414
index_col=...,
415
coerce_float=...,
416
params=...,
417
parse_dates=...,
418
columns=...,
419
chunksize: None = ...,
420
) -> DataFrame:
421
...
422
423
424
@overload
425
def read_sql(
426
sql,
427
con,
428
index_col=...,
429
coerce_float=...,
430
params=...,
431
parse_dates=...,
432
columns=...,
433
chunksize: int = ...,
434
) -> Iterator[DataFrame]:
435
...
436
437
438
def read_sql(
439
sql,
440
con,
441
index_col: str | Sequence[str] | None = None,
442
coerce_float: bool = True,
443
params=None,
444
parse_dates=None,
445
columns=None,
446
chunksize: int | None = None,
447
) -> DataFrame | Iterator[DataFrame]:
448
"""
449
Read SQL query or database table into a DataFrame.
450
451
This function is a convenience wrapper around ``read_sql_table`` and
452
``read_sql_query`` (for backward compatibility). It will delegate
453
to the specific function depending on the provided input. A SQL query
454
will be routed to ``read_sql_query``, while a database table name will
455
be routed to ``read_sql_table``. Note that the delegated function might
456
have more specific notes about their functionality not listed here.
457
458
Parameters
459
----------
460
sql : str or SQLAlchemy Selectable (select or text object)
461
SQL query to be executed or a table name.
462
con : SQLAlchemy connectable, str, or sqlite3 connection
463
Using SQLAlchemy makes it possible to use any DB supported by that
464
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
465
for engine disposal and connection closure for the SQLAlchemy connectable; str
466
connections are closed automatically. See
467
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
468
index_col : str or list of str, optional, default: None
469
Column(s) to set as index(MultiIndex).
470
coerce_float : bool, default True
471
Attempts to convert values of non-string, non-numeric objects (like
472
decimal.Decimal) to floating point, useful for SQL result sets.
473
params : list, tuple or dict, optional, default: None
474
List of parameters to pass to execute method. The syntax used
475
to pass parameters is database driver dependent. Check your
476
database driver documentation for which of the five syntax styles,
477
described in PEP 249's paramstyle, is supported.
478
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
479
parse_dates : list or dict, default: None
480
- List of column names to parse as dates.
481
- Dict of ``{column_name: format string}`` where format string is
482
strftime compatible in case of parsing string times, or is one of
483
(D, s, ns, ms, us) in case of parsing integer timestamps.
484
- Dict of ``{column_name: arg dict}``, where the arg dict corresponds
485
to the keyword arguments of :func:`pandas.to_datetime`
486
Especially useful with databases without native Datetime support,
487
such as SQLite.
488
columns : list, default: None
489
List of column names to select from SQL table (only used when reading
490
a table).
491
chunksize : int, default None
492
If specified, return an iterator where `chunksize` is the
493
number of rows to include in each chunk.
494
495
Returns
496
-------
497
DataFrame or Iterator[DataFrame]
498
499
See Also
500
--------
501
read_sql_table : Read SQL database table into a DataFrame.
502
read_sql_query : Read SQL query into a DataFrame.
503
504
Examples
505
--------
506
Read data from SQL via either a SQL query or a SQL tablename.
507
When using a SQLite database only SQL queries are accepted,
508
providing only the SQL tablename will result in an error.
509
510
>>> from sqlite3 import connect
511
>>> conn = connect(':memory:')
512
>>> df = pd.DataFrame(data=[[0, '10/11/12'], [1, '12/11/10']],
513
... columns=['int_column', 'date_column'])
514
>>> df.to_sql('test_data', conn)
515
2
516
517
>>> pd.read_sql('SELECT int_column, date_column FROM test_data', conn)
518
int_column date_column
519
0 0 10/11/12
520
1 1 12/11/10
521
522
>>> pd.read_sql('test_data', 'postgres:///db_name') # doctest:+SKIP
523
524
Apply date parsing to columns through the ``parse_dates`` argument
525
526
>>> pd.read_sql('SELECT int_column, date_column FROM test_data',
527
... conn,
528
... parse_dates=["date_column"])
529
int_column date_column
530
0 0 2012-10-11
531
1 1 2010-12-11
532
533
The ``parse_dates`` argument calls ``pd.to_datetime`` on the provided columns.
534
Custom argument values for applying ``pd.to_datetime`` on a column are specified
535
via a dictionary format:
536
1. Ignore errors while parsing the values of "date_column"
537
538
>>> pd.read_sql('SELECT int_column, date_column FROM test_data',
539
... conn,
540
... parse_dates={"date_column": {"errors": "ignore"}})
541
int_column date_column
542
0 0 2012-10-11
543
1 1 2010-12-11
544
545
2. Apply a dayfirst date parsing order on the values of "date_column"
546
547
>>> pd.read_sql('SELECT int_column, date_column FROM test_data',
548
... conn,
549
... parse_dates={"date_column": {"dayfirst": True}})
550
int_column date_column
551
0 0 2012-11-10
552
1 1 2010-11-12
553
554
3. Apply custom formatting when date parsing the values of "date_column"
555
556
>>> pd.read_sql('SELECT int_column, date_column FROM test_data',
557
... conn,
558
... parse_dates={"date_column": {"format": "%d/%m/%y"}})
559
int_column date_column
560
0 0 2012-11-10
561
1 1 2010-11-12
562
"""
563
pandas_sql = pandasSQL_builder(con)
564
565
if isinstance(pandas_sql, SQLiteDatabase):
566
return pandas_sql.read_query(
567
sql,
568
index_col=index_col,
569
params=params,
570
coerce_float=coerce_float,
571
parse_dates=parse_dates,
572
chunksize=chunksize,
573
)
574
575
try:
576
_is_table_name = pandas_sql.has_table(sql)
577
except Exception:
578
# using generic exception to catch errors from sql drivers (GH24988)
579
_is_table_name = False
580
581
if _is_table_name:
582
pandas_sql.meta.reflect(bind=pandas_sql.connectable, only=[sql])
583
return pandas_sql.read_table(
584
sql,
585
index_col=index_col,
586
coerce_float=coerce_float,
587
parse_dates=parse_dates,
588
columns=columns,
589
chunksize=chunksize,
590
)
591
else:
592
return pandas_sql.read_query(
593
sql,
594
index_col=index_col,
595
params=params,
596
coerce_float=coerce_float,
597
parse_dates=parse_dates,
598
chunksize=chunksize,
599
)
600
601
602
def to_sql(
603
frame,
604
name: str,
605
con,
606
schema: str | None = None,
607
if_exists: str = "fail",
608
index: bool = True,
609
index_label=None,
610
chunksize: int | None = None,
611
dtype: DtypeArg | None = None,
612
method: str | None = None,
613
engine: str = "auto",
614
**engine_kwargs,
615
) -> int | None:
616
"""
617
Write records stored in a DataFrame to a SQL database.
618
619
Parameters
620
----------
621
frame : DataFrame, Series
622
name : str
623
Name of SQL table.
624
con : SQLAlchemy connectable(engine/connection) or database string URI
625
or sqlite3 DBAPI2 connection
626
Using SQLAlchemy makes it possible to use any DB supported by that
627
library.
628
If a DBAPI2 object, only sqlite3 is supported.
629
schema : str, optional
630
Name of SQL schema in database to write to (if database flavor
631
supports this). If None, use default schema (default).
632
if_exists : {'fail', 'replace', 'append'}, default 'fail'
633
- fail: If table exists, do nothing.
634
- replace: If table exists, drop it, recreate it, and insert data.
635
- append: If table exists, insert data. Create if does not exist.
636
index : bool, default True
637
Write DataFrame index as a column.
638
index_label : str or sequence, optional
639
Column label for index column(s). If None is given (default) and
640
`index` is True, then the index names are used.
641
A sequence should be given if the DataFrame uses MultiIndex.
642
chunksize : int, optional
643
Specify the number of rows in each batch to be written at a time.
644
By default, all rows will be written at once.
645
dtype : dict or scalar, optional
646
Specifying the datatype for columns. If a dictionary is used, the
647
keys should be the column names and the values should be the
648
SQLAlchemy types or strings for the sqlite3 fallback mode. If a
649
scalar is provided, it will be applied to all columns.
650
method : {None, 'multi', callable}, optional
651
Controls the SQL insertion clause used:
652
653
- None : Uses standard SQL ``INSERT`` clause (one per row).
654
- ``'multi'``: Pass multiple values in a single ``INSERT`` clause.
655
- callable with signature ``(pd_table, conn, keys, data_iter) -> int | None``.
656
657
Details and a sample callable implementation can be found in the
658
section :ref:`insert method <io.sql.method>`.
659
engine : {'auto', 'sqlalchemy'}, default 'auto'
660
SQL engine library to use. If 'auto', then the option
661
``io.sql.engine`` is used. The default ``io.sql.engine``
662
behavior is 'sqlalchemy'
663
664
.. versionadded:: 1.3.0
665
666
**engine_kwargs
667
Any additional kwargs are passed to the engine.
668
669
Returns
670
-------
671
None or int
672
Number of rows affected by to_sql. None is returned if the callable
673
passed into ``method`` does not return the number of rows.
674
675
.. versionadded:: 1.4.0
676
677
Notes
678
-----
679
The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor``
680
or SQLAlchemy connectable. The returned value may not reflect the exact number of written
681
rows as stipulated in the
682
`sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
683
`SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
684
""" # noqa:E501
685
if if_exists not in ("fail", "replace", "append"):
686
raise ValueError(f"'{if_exists}' is not valid for if_exists")
687
688
pandas_sql = pandasSQL_builder(con, schema=schema)
689
690
if isinstance(frame, Series):
691
frame = frame.to_frame()
692
elif not isinstance(frame, DataFrame):
693
raise NotImplementedError(
694
"'frame' argument should be either a Series or a DataFrame"
695
)
696
697
return pandas_sql.to_sql(
698
frame,
699
name,
700
if_exists=if_exists,
701
index=index,
702
index_label=index_label,
703
schema=schema,
704
chunksize=chunksize,
705
dtype=dtype,
706
method=method,
707
engine=engine,
708
**engine_kwargs,
709
)
710
711
712
def has_table(table_name: str, con, schema: str | None = None):
713
"""
714
Check if DataBase has named table.
715
716
Parameters
717
----------
718
table_name: string
719
Name of SQL table.
720
con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection
721
Using SQLAlchemy makes it possible to use any DB supported by that
722
library.
723
If a DBAPI2 object, only sqlite3 is supported.
724
schema : string, default None
725
Name of SQL schema in database to write to (if database flavor supports
726
this). If None, use default schema (default).
727
728
Returns
729
-------
730
boolean
731
"""
732
pandas_sql = pandasSQL_builder(con, schema=schema)
733
return pandas_sql.has_table(table_name)
734
735
736
table_exists = has_table
737
738
739
def pandasSQL_builder(con, schema: str | None = None):
740
"""
741
Convenience function to return the correct PandasSQL subclass based on the
742
provided parameters.
743
"""
744
import sqlite3
745
import warnings
746
747
if isinstance(con, sqlite3.Connection) or con is None:
748
return SQLiteDatabase(con)
749
750
sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
751
752
if isinstance(con, str):
753
if sqlalchemy is None:
754
raise ImportError("Using URI string without sqlalchemy installed.")
755
else:
756
con = sqlalchemy.create_engine(con)
757
758
if sqlalchemy is not None and isinstance(con, sqlalchemy.engine.Connectable):
759
return SQLDatabase(con, schema=schema)
760
761
warnings.warn(
762
"pandas only support SQLAlchemy connectable(engine/connection) or"
763
"database string URI or sqlite3 DBAPI2 connection"
764
"other DBAPI2 objects are not tested, please consider using SQLAlchemy",
765
UserWarning,
766
)
767
return SQLiteDatabase(con)
768
769
770
class SQLTable(PandasObject):
771
"""
772
For mapping Pandas tables to SQL tables.
773
Uses fact that table is reflected by SQLAlchemy to
774
do better type conversions.
775
Also holds various flags needed to avoid having to
776
pass them between functions all the time.
777
"""
778
779
# TODO: support for multiIndex
780
781
def __init__(
782
self,
783
name: str,
784
pandas_sql_engine,
785
frame=None,
786
index=True,
787
if_exists="fail",
788
prefix="pandas",
789
index_label=None,
790
schema=None,
791
keys=None,
792
dtype: DtypeArg | None = None,
793
):
794
self.name = name
795
self.pd_sql = pandas_sql_engine
796
self.prefix = prefix
797
self.frame = frame
798
self.index = self._index_name(index, index_label)
799
self.schema = schema
800
self.if_exists = if_exists
801
self.keys = keys
802
self.dtype = dtype
803
804
if frame is not None:
805
# We want to initialize based on a dataframe
806
self.table = self._create_table_setup()
807
else:
808
# no data provided, read-only mode
809
self.table = self.pd_sql.get_table(self.name, self.schema)
810
811
if self.table is None:
812
raise ValueError(f"Could not init table '{name}'")
813
814
def exists(self):
815
return self.pd_sql.has_table(self.name, self.schema)
816
817
def sql_schema(self):
818
from sqlalchemy.schema import CreateTable
819
820
return str(CreateTable(self.table).compile(self.pd_sql.connectable))
821
822
def _execute_create(self):
823
# Inserting table into database, add to MetaData object
824
if _gt14():
825
self.table = self.table.to_metadata(self.pd_sql.meta)
826
else:
827
self.table = self.table.tometadata(self.pd_sql.meta)
828
self.table.create(bind=self.pd_sql.connectable)
829
830
def create(self):
831
if self.exists():
832
if self.if_exists == "fail":
833
raise ValueError(f"Table '{self.name}' already exists.")
834
elif self.if_exists == "replace":
835
self.pd_sql.drop_table(self.name, self.schema)
836
self._execute_create()
837
elif self.if_exists == "append":
838
pass
839
else:
840
raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
841
else:
842
self._execute_create()
843
844
def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
845
"""
846
Execute SQL statement inserting data
847
848
Parameters
849
----------
850
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
851
keys : list of str
852
Column names
853
data_iter : generator of list
854
Each item contains a list of values to be inserted
855
"""
856
data = [dict(zip(keys, row)) for row in data_iter]
857
result = conn.execute(self.table.insert(), data)
858
return result.rowcount
859
860
def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
861
"""
862
Alternative to _execute_insert for DBs support multivalue INSERT.
863
864
Note: multi-value insert is usually faster for analytics DBs
865
and tables containing a few columns
866
but performance degrades quickly with increase of columns.
867
"""
868
869
from sqlalchemy import insert
870
871
data = [dict(zip(keys, row)) for row in data_iter]
872
stmt = insert(self.table).values(data)
873
result = conn.execute(stmt)
874
return result.rowcount
875
876
def insert_data(self):
877
if self.index is not None:
878
temp = self.frame.copy()
879
temp.index.names = self.index
880
try:
881
temp.reset_index(inplace=True)
882
except ValueError as err:
883
raise ValueError(f"duplicate name in index/columns: {err}") from err
884
else:
885
temp = self.frame
886
887
column_names = list(map(str, temp.columns))
888
ncols = len(column_names)
889
data_list = [None] * ncols
890
891
for i, (_, ser) in enumerate(temp.items()):
892
vals = ser._values
893
if vals.dtype.kind == "M":
894
d = vals.to_pydatetime()
895
elif vals.dtype.kind == "m":
896
# store as integers, see GH#6921, GH#7076
897
d = vals.view("i8").astype(object)
898
else:
899
d = vals.astype(object)
900
901
assert isinstance(d, np.ndarray), type(d)
902
903
if ser._can_hold_na:
904
# Note: this will miss timedeltas since they are converted to int
905
mask = isna(d)
906
d[mask] = None
907
908
# error: No overload variant of "__setitem__" of "list" matches
909
# argument types "int", "ndarray"
910
data_list[i] = d # type: ignore[call-overload]
911
912
return column_names, data_list
913
914
def insert(
915
self, chunksize: int | None = None, method: str | None = None
916
) -> int | None:
917
918
# set insert method
919
if method is None:
920
exec_insert = self._execute_insert
921
elif method == "multi":
922
exec_insert = self._execute_insert_multi
923
elif callable(method):
924
exec_insert = partial(method, self)
925
else:
926
raise ValueError(f"Invalid parameter `method`: {method}")
927
928
keys, data_list = self.insert_data()
929
930
nrows = len(self.frame)
931
932
if nrows == 0:
933
return 0
934
935
if chunksize is None:
936
chunksize = nrows
937
elif chunksize == 0:
938
raise ValueError("chunksize argument should be non-zero")
939
940
chunks = (nrows // chunksize) + 1
941
total_inserted = 0
942
with self.pd_sql.run_transaction() as conn:
943
for i in range(chunks):
944
start_i = i * chunksize
945
end_i = min((i + 1) * chunksize, nrows)
946
if start_i >= end_i:
947
break
948
949
chunk_iter = zip(*(arr[start_i:end_i] for arr in data_list))
950
num_inserted = exec_insert(conn, keys, chunk_iter)
951
if num_inserted is None:
952
total_inserted = None
953
else:
954
total_inserted += num_inserted
955
return total_inserted
956
957
def _query_iterator(
958
self,
959
result,
960
chunksize: str | None,
961
columns,
962
coerce_float: bool = True,
963
parse_dates=None,
964
):
965
"""Return generator through chunked result set."""
966
has_read_data = False
967
while True:
968
data = result.fetchmany(chunksize)
969
if not data:
970
if not has_read_data:
971
yield DataFrame.from_records(
972
[], columns=columns, coerce_float=coerce_float
973
)
974
break
975
else:
976
has_read_data = True
977
self.frame = DataFrame.from_records(
978
data, columns=columns, coerce_float=coerce_float
979
)
980
981
self._harmonize_columns(parse_dates=parse_dates)
982
983
if self.index is not None:
984
self.frame.set_index(self.index, inplace=True)
985
986
yield self.frame
987
988
def read(self, coerce_float=True, parse_dates=None, columns=None, chunksize=None):
989
from sqlalchemy import select
990
991
if columns is not None and len(columns) > 0:
992
cols = [self.table.c[n] for n in columns]
993
if self.index is not None:
994
for idx in self.index[::-1]:
995
cols.insert(0, self.table.c[idx])
996
sql_select = select(*cols) if _gt14() else select(cols)
997
else:
998
sql_select = select(self.table) if _gt14() else self.table.select()
999
1000
result = self.pd_sql.execute(sql_select)
1001
column_names = result.keys()
1002
1003
if chunksize is not None:
1004
return self._query_iterator(
1005
result,
1006
chunksize,
1007
column_names,
1008
coerce_float=coerce_float,
1009
parse_dates=parse_dates,
1010
)
1011
else:
1012
data = result.fetchall()
1013
self.frame = DataFrame.from_records(
1014
data, columns=column_names, coerce_float=coerce_float
1015
)
1016
1017
self._harmonize_columns(parse_dates=parse_dates)
1018
1019
if self.index is not None:
1020
self.frame.set_index(self.index, inplace=True)
1021
1022
return self.frame
1023
1024
def _index_name(self, index, index_label):
1025
# for writing: index=True to include index in sql table
1026
if index is True:
1027
nlevels = self.frame.index.nlevels
1028
# if index_label is specified, set this as index name(s)
1029
if index_label is not None:
1030
if not isinstance(index_label, list):
1031
index_label = [index_label]
1032
if len(index_label) != nlevels:
1033
raise ValueError(
1034
"Length of 'index_label' should match number of "
1035
f"levels, which is {nlevels}"
1036
)
1037
else:
1038
return index_label
1039
# return the used column labels for the index columns
1040
if (
1041
nlevels == 1
1042
and "index" not in self.frame.columns
1043
and self.frame.index.name is None
1044
):
1045
return ["index"]
1046
else:
1047
return com.fill_missing_names(self.frame.index.names)
1048
1049
# for reading: index=(list of) string to specify column to set as index
1050
elif isinstance(index, str):
1051
return [index]
1052
elif isinstance(index, list):
1053
return index
1054
else:
1055
return None
1056
1057
def _get_column_names_and_types(self, dtype_mapper):
1058
column_names_and_types = []
1059
if self.index is not None:
1060
for i, idx_label in enumerate(self.index):
1061
idx_type = dtype_mapper(self.frame.index._get_level_values(i))
1062
column_names_and_types.append((str(idx_label), idx_type, True))
1063
1064
column_names_and_types += [
1065
(str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False)
1066
for i in range(len(self.frame.columns))
1067
]
1068
1069
return column_names_and_types
1070
1071
def _create_table_setup(self):
1072
from sqlalchemy import (
1073
Column,
1074
PrimaryKeyConstraint,
1075
Table,
1076
)
1077
from sqlalchemy.schema import MetaData
1078
1079
column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type)
1080
1081
columns = [
1082
Column(name, typ, index=is_index)
1083
for name, typ, is_index in column_names_and_types
1084
]
1085
1086
if self.keys is not None:
1087
if not is_list_like(self.keys):
1088
keys = [self.keys]
1089
else:
1090
keys = self.keys
1091
pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk")
1092
columns.append(pkc)
1093
1094
schema = self.schema or self.pd_sql.meta.schema
1095
1096
# At this point, attach to new metadata, only attach to self.meta
1097
# once table is created.
1098
meta = MetaData()
1099
return Table(self.name, meta, *columns, schema=schema)
1100
1101
def _harmonize_columns(self, parse_dates=None):
1102
"""
1103
Make the DataFrame's column types align with the SQL table
1104
column types.
1105
Need to work around limited NA value support. Floats are always
1106
fine, ints must always be floats if there are Null values.
1107
Booleans are hard because converting bool column with None replaces
1108
all Nones with false. Therefore only convert bool if there are no
1109
NA values.
1110
Datetimes should already be converted to np.datetime64 if supported,
1111
but here we also force conversion if required.
1112
"""
1113
parse_dates = _process_parse_dates_argument(parse_dates)
1114
1115
for sql_col in self.table.columns:
1116
col_name = sql_col.name
1117
try:
1118
df_col = self.frame[col_name]
1119
1120
# Handle date parsing upfront; don't try to convert columns
1121
# twice
1122
if col_name in parse_dates:
1123
try:
1124
fmt = parse_dates[col_name]
1125
except TypeError:
1126
fmt = None
1127
self.frame[col_name] = _handle_date_column(df_col, format=fmt)
1128
continue
1129
1130
# the type the dataframe column should have
1131
col_type = self._get_dtype(sql_col.type)
1132
1133
if (
1134
col_type is datetime
1135
or col_type is date
1136
or col_type is DatetimeTZDtype
1137
):
1138
# Convert tz-aware Datetime SQL columns to UTC
1139
utc = col_type is DatetimeTZDtype
1140
self.frame[col_name] = _handle_date_column(df_col, utc=utc)
1141
elif col_type is float:
1142
# floats support NA, can always convert!
1143
self.frame[col_name] = df_col.astype(col_type, copy=False)
1144
1145
elif len(df_col) == df_col.count():
1146
# No NA values, can convert ints and bools
1147
if col_type is np.dtype("int64") or col_type is bool:
1148
self.frame[col_name] = df_col.astype(col_type, copy=False)
1149
except KeyError:
1150
pass # this column not in results
1151
1152
def _sqlalchemy_type(self, col):
1153
1154
dtype: DtypeArg = self.dtype or {}
1155
if is_dict_like(dtype):
1156
dtype = cast(dict, dtype)
1157
if col.name in dtype:
1158
return dtype[col.name]
1159
1160
# Infer type of column, while ignoring missing values.
1161
# Needed for inserting typed data containing NULLs, GH 8778.
1162
col_type = lib.infer_dtype(col, skipna=True)
1163
1164
from sqlalchemy.types import (
1165
TIMESTAMP,
1166
BigInteger,
1167
Boolean,
1168
Date,
1169
DateTime,
1170
Float,
1171
Integer,
1172
SmallInteger,
1173
Text,
1174
Time,
1175
)
1176
1177
if col_type == "datetime64" or col_type == "datetime":
1178
# GH 9086: TIMESTAMP is the suggested type if the column contains
1179
# timezone information
1180
try:
1181
if col.dt.tz is not None:
1182
return TIMESTAMP(timezone=True)
1183
except AttributeError:
1184
# The column is actually a DatetimeIndex
1185
# GH 26761 or an Index with date-like data e.g. 9999-01-01
1186
if getattr(col, "tz", None) is not None:
1187
return TIMESTAMP(timezone=True)
1188
return DateTime
1189
if col_type == "timedelta64":
1190
warnings.warn(
1191
"the 'timedelta' type is not supported, and will be "
1192
"written as integer values (ns frequency) to the database.",
1193
UserWarning,
1194
stacklevel=find_stack_level(),
1195
)
1196
return BigInteger
1197
elif col_type == "floating":
1198
if col.dtype == "float32":
1199
return Float(precision=23)
1200
else:
1201
return Float(precision=53)
1202
elif col_type == "integer":
1203
# GH35076 Map pandas integer to optimal SQLAlchemy integer type
1204
if col.dtype.name.lower() in ("int8", "uint8", "int16"):
1205
return SmallInteger
1206
elif col.dtype.name.lower() in ("uint16", "int32"):
1207
return Integer
1208
elif col.dtype.name.lower() == "uint64":
1209
raise ValueError("Unsigned 64 bit integer datatype is not supported")
1210
else:
1211
return BigInteger
1212
elif col_type == "boolean":
1213
return Boolean
1214
elif col_type == "date":
1215
return Date
1216
elif col_type == "time":
1217
return Time
1218
elif col_type == "complex":
1219
raise ValueError("Complex datatypes not supported")
1220
1221
return Text
1222
1223
def _get_dtype(self, sqltype):
1224
from sqlalchemy.types import (
1225
TIMESTAMP,
1226
Boolean,
1227
Date,
1228
DateTime,
1229
Float,
1230
Integer,
1231
)
1232
1233
if isinstance(sqltype, Float):
1234
return float
1235
elif isinstance(sqltype, Integer):
1236
# TODO: Refine integer size.
1237
return np.dtype("int64")
1238
elif isinstance(sqltype, TIMESTAMP):
1239
# we have a timezone capable type
1240
if not sqltype.timezone:
1241
return datetime
1242
return DatetimeTZDtype
1243
elif isinstance(sqltype, DateTime):
1244
# Caution: np.datetime64 is also a subclass of np.number.
1245
return datetime
1246
elif isinstance(sqltype, Date):
1247
return date
1248
elif isinstance(sqltype, Boolean):
1249
return bool
1250
return object
1251
1252
1253
class PandasSQL(PandasObject):
1254
"""
1255
Subclasses Should define read_sql and to_sql.
1256
"""
1257
1258
def read_sql(self, *args, **kwargs):
1259
raise ValueError(
1260
"PandasSQL must be created with an SQLAlchemy "
1261
"connectable or sqlite connection"
1262
)
1263
1264
def to_sql(
1265
self,
1266
frame,
1267
name,
1268
if_exists="fail",
1269
index=True,
1270
index_label=None,
1271
schema=None,
1272
chunksize=None,
1273
dtype: DtypeArg | None = None,
1274
method=None,
1275
) -> int | None:
1276
raise ValueError(
1277
"PandasSQL must be created with an SQLAlchemy "
1278
"connectable or sqlite connection"
1279
)
1280
1281
1282
class BaseEngine:
1283
def insert_records(
1284
self,
1285
table: SQLTable,
1286
con,
1287
frame,
1288
name,
1289
index=True,
1290
schema=None,
1291
chunksize=None,
1292
method=None,
1293
**engine_kwargs,
1294
) -> int | None:
1295
"""
1296
Inserts data into already-prepared table
1297
"""
1298
raise AbstractMethodError(self)
1299
1300
1301
class SQLAlchemyEngine(BaseEngine):
1302
def __init__(self):
1303
import_optional_dependency(
1304
"sqlalchemy", extra="sqlalchemy is required for SQL support."
1305
)
1306
1307
def insert_records(
1308
self,
1309
table: SQLTable,
1310
con,
1311
frame,
1312
name,
1313
index=True,
1314
schema=None,
1315
chunksize=None,
1316
method=None,
1317
**engine_kwargs,
1318
) -> int | None:
1319
from sqlalchemy import exc
1320
1321
try:
1322
return table.insert(chunksize=chunksize, method=method)
1323
except exc.SQLAlchemyError as err:
1324
# GH34431
1325
# https://stackoverflow.com/a/67358288/6067848
1326
msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?#
1327
)|inf can not be used with MySQL"""
1328
err_text = str(err.orig)
1329
if re.search(msg, err_text):
1330
raise ValueError("inf cannot be used with MySQL") from err
1331
else:
1332
raise err
1333
1334
1335
def get_engine(engine: str) -> BaseEngine:
1336
"""return our implementation"""
1337
if engine == "auto":
1338
engine = get_option("io.sql.engine")
1339
1340
if engine == "auto":
1341
# try engines in this order
1342
engine_classes = [SQLAlchemyEngine]
1343
1344
error_msgs = ""
1345
for engine_class in engine_classes:
1346
try:
1347
return engine_class()
1348
except ImportError as err:
1349
error_msgs += "\n - " + str(err)
1350
1351
raise ImportError(
1352
"Unable to find a usable engine; "
1353
"tried using: 'sqlalchemy'.\n"
1354
"A suitable version of "
1355
"sqlalchemy is required for sql I/O "
1356
"support.\n"
1357
"Trying to import the above resulted in these errors:"
1358
f"{error_msgs}"
1359
)
1360
1361
elif engine == "sqlalchemy":
1362
return SQLAlchemyEngine()
1363
1364
raise ValueError("engine must be one of 'auto', 'sqlalchemy'")
1365
1366
1367
class SQLDatabase(PandasSQL):
1368
"""
1369
This class enables conversion between DataFrame and SQL databases
1370
using SQLAlchemy to handle DataBase abstraction.
1371
1372
Parameters
1373
----------
1374
engine : SQLAlchemy connectable
1375
Connectable to connect with the database. Using SQLAlchemy makes it
1376
possible to use any DB supported by that library.
1377
schema : string, default None
1378
Name of SQL schema in database to write to (if database flavor
1379
supports this). If None, use default schema (default).
1380
1381
"""
1382
1383
def __init__(self, engine, schema: str | None = None):
1384
from sqlalchemy.schema import MetaData
1385
1386
self.connectable = engine
1387
self.meta = MetaData(schema=schema)
1388
1389
@contextmanager
1390
def run_transaction(self):
1391
from sqlalchemy.engine import Engine
1392
1393
if isinstance(self.connectable, Engine):
1394
with self.connectable.connect() as conn:
1395
with conn.begin():
1396
yield conn
1397
else:
1398
yield self.connectable
1399
1400
def execute(self, *args, **kwargs):
1401
"""Simple passthrough to SQLAlchemy connectable"""
1402
return self.connectable.execution_options().execute(*args, **kwargs)
1403
1404
def read_table(
1405
self,
1406
table_name: str,
1407
index_col: str | Sequence[str] | None = None,
1408
coerce_float: bool = True,
1409
parse_dates=None,
1410
columns=None,
1411
schema: str | None = None,
1412
chunksize: int | None = None,
1413
):
1414
"""
1415
Read SQL database table into a DataFrame.
1416
1417
Parameters
1418
----------
1419
table_name : str
1420
Name of SQL table in database.
1421
index_col : string, optional, default: None
1422
Column to set as index.
1423
coerce_float : bool, default True
1424
Attempts to convert values of non-string, non-numeric objects
1425
(like decimal.Decimal) to floating point. This can result in
1426
loss of precision.
1427
parse_dates : list or dict, default: None
1428
- List of column names to parse as dates.
1429
- Dict of ``{column_name: format string}`` where format string is
1430
strftime compatible in case of parsing string times, or is one of
1431
(D, s, ns, ms, us) in case of parsing integer timestamps.
1432
- Dict of ``{column_name: arg}``, where the arg corresponds
1433
to the keyword arguments of :func:`pandas.to_datetime`.
1434
Especially useful with databases without native Datetime support,
1435
such as SQLite.
1436
columns : list, default: None
1437
List of column names to select from SQL table.
1438
schema : string, default None
1439
Name of SQL schema in database to query (if database flavor
1440
supports this). If specified, this overwrites the default
1441
schema of the SQL database object.
1442
chunksize : int, default None
1443
If specified, return an iterator where `chunksize` is the number
1444
of rows to include in each chunk.
1445
1446
Returns
1447
-------
1448
DataFrame
1449
1450
See Also
1451
--------
1452
pandas.read_sql_table
1453
SQLDatabase.read_query
1454
1455
"""
1456
table = SQLTable(table_name, self, index=index_col, schema=schema)
1457
return table.read(
1458
coerce_float=coerce_float,
1459
parse_dates=parse_dates,
1460
columns=columns,
1461
chunksize=chunksize,
1462
)
1463
1464
@staticmethod
1465
def _query_iterator(
1466
result,
1467
chunksize: int,
1468
columns,
1469
index_col=None,
1470
coerce_float=True,
1471
parse_dates=None,
1472
dtype: DtypeArg | None = None,
1473
):
1474
"""Return generator through chunked result set"""
1475
has_read_data = False
1476
while True:
1477
data = result.fetchmany(chunksize)
1478
if not data:
1479
if not has_read_data:
1480
yield _wrap_result(
1481
[],
1482
columns,
1483
index_col=index_col,
1484
coerce_float=coerce_float,
1485
parse_dates=parse_dates,
1486
)
1487
break
1488
else:
1489
has_read_data = True
1490
yield _wrap_result(
1491
data,
1492
columns,
1493
index_col=index_col,
1494
coerce_float=coerce_float,
1495
parse_dates=parse_dates,
1496
dtype=dtype,
1497
)
1498
1499
def read_query(
1500
self,
1501
sql: str,
1502
index_col: str | None = None,
1503
coerce_float: bool = True,
1504
parse_dates=None,
1505
params=None,
1506
chunksize: int | None = None,
1507
dtype: DtypeArg | None = None,
1508
):
1509
"""
1510
Read SQL query into a DataFrame.
1511
1512
Parameters
1513
----------
1514
sql : str
1515
SQL query to be executed.
1516
index_col : string, optional, default: None
1517
Column name to use as index for the returned DataFrame object.
1518
coerce_float : bool, default True
1519
Attempt to convert values of non-string, non-numeric objects (like
1520
decimal.Decimal) to floating point, useful for SQL result sets.
1521
params : list, tuple or dict, optional, default: None
1522
List of parameters to pass to execute method. The syntax used
1523
to pass parameters is database driver dependent. Check your
1524
database driver documentation for which of the five syntax styles,
1525
described in PEP 249's paramstyle, is supported.
1526
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
1527
parse_dates : list or dict, default: None
1528
- List of column names to parse as dates.
1529
- Dict of ``{column_name: format string}`` where format string is
1530
strftime compatible in case of parsing string times, or is one of
1531
(D, s, ns, ms, us) in case of parsing integer timestamps.
1532
- Dict of ``{column_name: arg dict}``, where the arg dict
1533
corresponds to the keyword arguments of
1534
:func:`pandas.to_datetime` Especially useful with databases
1535
without native Datetime support, such as SQLite.
1536
chunksize : int, default None
1537
If specified, return an iterator where `chunksize` is the number
1538
of rows to include in each chunk.
1539
dtype : Type name or dict of columns
1540
Data type for data or columns. E.g. np.float64 or
1541
{‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}
1542
1543
.. versionadded:: 1.3.0
1544
1545
Returns
1546
-------
1547
DataFrame
1548
1549
See Also
1550
--------
1551
read_sql_table : Read SQL database table into a DataFrame.
1552
read_sql
1553
1554
"""
1555
args = _convert_params(sql, params)
1556
1557
result = self.execute(*args)
1558
columns = result.keys()
1559
1560
if chunksize is not None:
1561
return self._query_iterator(
1562
result,
1563
chunksize,
1564
columns,
1565
index_col=index_col,
1566
coerce_float=coerce_float,
1567
parse_dates=parse_dates,
1568
dtype=dtype,
1569
)
1570
else:
1571
data = result.fetchall()
1572
frame = _wrap_result(
1573
data,
1574
columns,
1575
index_col=index_col,
1576
coerce_float=coerce_float,
1577
parse_dates=parse_dates,
1578
dtype=dtype,
1579
)
1580
return frame
1581
1582
read_sql = read_query
1583
1584
def prep_table(
1585
self,
1586
frame,
1587
name,
1588
if_exists="fail",
1589
index=True,
1590
index_label=None,
1591
schema=None,
1592
dtype: DtypeArg | None = None,
1593
) -> SQLTable:
1594
"""
1595
Prepares table in the database for data insertion. Creates it if needed, etc.
1596
"""
1597
if dtype:
1598
if not is_dict_like(dtype):
1599
# error: Value expression in dictionary comprehension has incompatible
1600
# type "Union[ExtensionDtype, str, dtype[Any], Type[object],
1601
# Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
1602
# Type[str], Type[float], Type[int], Type[complex], Type[bool],
1603
# Type[object]]]]"; expected type "Union[ExtensionDtype, str,
1604
# dtype[Any], Type[object]]"
1605
dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
1606
else:
1607
dtype = cast(dict, dtype)
1608
1609
from sqlalchemy.types import (
1610
TypeEngine,
1611
to_instance,
1612
)
1613
1614
for col, my_type in dtype.items():
1615
if not isinstance(to_instance(my_type), TypeEngine):
1616
raise ValueError(f"The type of {col} is not a SQLAlchemy type")
1617
1618
table = SQLTable(
1619
name,
1620
self,
1621
frame=frame,
1622
index=index,
1623
if_exists=if_exists,
1624
index_label=index_label,
1625
schema=schema,
1626
dtype=dtype,
1627
)
1628
table.create()
1629
return table
1630
1631
def check_case_sensitive(
1632
self,
1633
name,
1634
schema,
1635
):
1636
"""
1637
Checks table name for issues with case-sensitivity.
1638
Method is called after data is inserted.
1639
"""
1640
if not name.isdigit() and not name.islower():
1641
# check for potentially case sensitivity issues (GH7815)
1642
# Only check when name is not a number and name is not lower case
1643
engine = self.connectable.engine
1644
with self.connectable.connect() as conn:
1645
if _gt14():
1646
from sqlalchemy import inspect
1647
1648
insp = inspect(conn)
1649
table_names = insp.get_table_names(
1650
schema=schema or self.meta.schema
1651
)
1652
else:
1653
table_names = engine.table_names(
1654
schema=schema or self.meta.schema, connection=conn
1655
)
1656
if name not in table_names:
1657
msg = (
1658
f"The provided table name '{name}' is not found exactly as "
1659
"such in the database after writing the table, possibly "
1660
"due to case sensitivity issues. Consider using lower "
1661
"case table names."
1662
)
1663
warnings.warn(msg, UserWarning)
1664
1665
def to_sql(
1666
self,
1667
frame,
1668
name,
1669
if_exists="fail",
1670
index=True,
1671
index_label=None,
1672
schema=None,
1673
chunksize=None,
1674
dtype: DtypeArg | None = None,
1675
method=None,
1676
engine="auto",
1677
**engine_kwargs,
1678
) -> int | None:
1679
"""
1680
Write records stored in a DataFrame to a SQL database.
1681
1682
Parameters
1683
----------
1684
frame : DataFrame
1685
name : string
1686
Name of SQL table.
1687
if_exists : {'fail', 'replace', 'append'}, default 'fail'
1688
- fail: If table exists, do nothing.
1689
- replace: If table exists, drop it, recreate it, and insert data.
1690
- append: If table exists, insert data. Create if does not exist.
1691
index : boolean, default True
1692
Write DataFrame index as a column.
1693
index_label : string or sequence, default None
1694
Column label for index column(s). If None is given (default) and
1695
`index` is True, then the index names are used.
1696
A sequence should be given if the DataFrame uses MultiIndex.
1697
schema : string, default None
1698
Name of SQL schema in database to write to (if database flavor
1699
supports this). If specified, this overwrites the default
1700
schema of the SQLDatabase object.
1701
chunksize : int, default None
1702
If not None, then rows will be written in batches of this size at a
1703
time. If None, all rows will be written at once.
1704
dtype : single type or dict of column name to SQL type, default None
1705
Optional specifying the datatype for columns. The SQL type should
1706
be a SQLAlchemy type. If all columns are of the same type, one
1707
single value can be used.
1708
method : {None', 'multi', callable}, default None
1709
Controls the SQL insertion clause used:
1710
1711
* None : Uses standard SQL ``INSERT`` clause (one per row).
1712
* 'multi': Pass multiple values in a single ``INSERT`` clause.
1713
* callable with signature ``(pd_table, conn, keys, data_iter)``.
1714
1715
Details and a sample callable implementation can be found in the
1716
section :ref:`insert method <io.sql.method>`.
1717
engine : {'auto', 'sqlalchemy'}, default 'auto'
1718
SQL engine library to use. If 'auto', then the option
1719
``io.sql.engine`` is used. The default ``io.sql.engine``
1720
behavior is 'sqlalchemy'
1721
1722
.. versionadded:: 1.3.0
1723
1724
**engine_kwargs
1725
Any additional kwargs are passed to the engine.
1726
"""
1727
sql_engine = get_engine(engine)
1728
1729
table = self.prep_table(
1730
frame=frame,
1731
name=name,
1732
if_exists=if_exists,
1733
index=index,
1734
index_label=index_label,
1735
schema=schema,
1736
dtype=dtype,
1737
)
1738
1739
total_inserted = sql_engine.insert_records(
1740
table=table,
1741
con=self.connectable,
1742
frame=frame,
1743
name=name,
1744
index=index,
1745
schema=schema,
1746
chunksize=chunksize,
1747
method=method,
1748
**engine_kwargs,
1749
)
1750
1751
self.check_case_sensitive(name=name, schema=schema)
1752
return total_inserted
1753
1754
@property
1755
def tables(self):
1756
return self.meta.tables
1757
1758
def has_table(self, name: str, schema: str | None = None):
1759
if _gt14():
1760
from sqlalchemy import inspect
1761
1762
insp = inspect(self.connectable)
1763
return insp.has_table(name, schema or self.meta.schema)
1764
else:
1765
return self.connectable.run_callable(
1766
self.connectable.dialect.has_table, name, schema or self.meta.schema
1767
)
1768
1769
def get_table(self, table_name: str, schema: str | None = None):
1770
from sqlalchemy import (
1771
Numeric,
1772
Table,
1773
)
1774
1775
schema = schema or self.meta.schema
1776
tbl = Table(
1777
table_name, self.meta, autoload_with=self.connectable, schema=schema
1778
)
1779
for column in tbl.columns:
1780
if isinstance(column.type, Numeric):
1781
column.type.asdecimal = False
1782
return tbl
1783
1784
def drop_table(self, table_name: str, schema: str | None = None):
1785
schema = schema or self.meta.schema
1786
if self.has_table(table_name, schema):
1787
self.meta.reflect(bind=self.connectable, only=[table_name], schema=schema)
1788
self.get_table(table_name, schema).drop(bind=self.connectable)
1789
self.meta.clear()
1790
1791
def _create_sql_schema(
1792
self,
1793
frame: DataFrame,
1794
table_name: str,
1795
keys: list[str] | None = None,
1796
dtype: DtypeArg | None = None,
1797
schema: str | None = None,
1798
):
1799
table = SQLTable(
1800
table_name,
1801
self,
1802
frame=frame,
1803
index=False,
1804
keys=keys,
1805
dtype=dtype,
1806
schema=schema,
1807
)
1808
return str(table.sql_schema())
1809
1810
1811
# ---- SQL without SQLAlchemy ---
1812
# sqlite-specific sql strings and handler class
1813
# dictionary used for readability purposes
1814
_SQL_TYPES = {
1815
"string": "TEXT",
1816
"floating": "REAL",
1817
"integer": "INTEGER",
1818
"datetime": "TIMESTAMP",
1819
"date": "DATE",
1820
"time": "TIME",
1821
"boolean": "INTEGER",
1822
}
1823
1824
1825
def _get_unicode_name(name):
1826
try:
1827
uname = str(name).encode("utf-8", "strict").decode("utf-8")
1828
except UnicodeError as err:
1829
raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") from err
1830
return uname
1831
1832
1833
def _get_valid_sqlite_name(name):
1834
# See https://stackoverflow.com/questions/6514274/how-do-you-escape-strings\
1835
# -for-sqlite-table-column-names-in-python
1836
# Ensure the string can be encoded as UTF-8.
1837
# Ensure the string does not include any NUL characters.
1838
# Replace all " with "".
1839
# Wrap the entire thing in double quotes.
1840
1841
uname = _get_unicode_name(name)
1842
if not len(uname):
1843
raise ValueError("Empty table or column name specified")
1844
1845
nul_index = uname.find("\x00")
1846
if nul_index >= 0:
1847
raise ValueError("SQLite identifier cannot contain NULs")
1848
return '"' + uname.replace('"', '""') + '"'
1849
1850
1851
class SQLiteTable(SQLTable):
1852
"""
1853
Patch the SQLTable for fallback support.
1854
Instead of a table variable just use the Create Table statement.
1855
"""
1856
1857
def __init__(self, *args, **kwargs):
1858
# GH 8341
1859
# register an adapter callable for datetime.time object
1860
import sqlite3
1861
1862
# this will transform time(12,34,56,789) into '12:34:56.000789'
1863
# (this is what sqlalchemy does)
1864
sqlite3.register_adapter(time, lambda _: _.strftime("%H:%M:%S.%f"))
1865
super().__init__(*args, **kwargs)
1866
1867
def sql_schema(self):
1868
return str(";\n".join(self.table))
1869
1870
def _execute_create(self):
1871
with self.pd_sql.run_transaction() as conn:
1872
for stmt in self.table:
1873
conn.execute(stmt)
1874
1875
def insert_statement(self, *, num_rows: int):
1876
names = list(map(str, self.frame.columns))
1877
wld = "?" # wildcard char
1878
escape = _get_valid_sqlite_name
1879
1880
if self.index is not None:
1881
for idx in self.index[::-1]:
1882
names.insert(0, idx)
1883
1884
bracketed_names = [escape(column) for column in names]
1885
col_names = ",".join(bracketed_names)
1886
1887
row_wildcards = ",".join([wld] * len(names))
1888
wildcards = ",".join([f"({row_wildcards})" for _ in range(num_rows)])
1889
insert_statement = (
1890
f"INSERT INTO {escape(self.name)} ({col_names}) VALUES {wildcards}"
1891
)
1892
return insert_statement
1893
1894
def _execute_insert(self, conn, keys, data_iter) -> int:
1895
data_list = list(data_iter)
1896
conn.executemany(self.insert_statement(num_rows=1), data_list)
1897
return conn.rowcount
1898
1899
def _execute_insert_multi(self, conn, keys, data_iter) -> int:
1900
data_list = list(data_iter)
1901
flattened_data = [x for row in data_list for x in row]
1902
conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
1903
return conn.rowcount
1904
1905
def _create_table_setup(self):
1906
"""
1907
Return a list of SQL statements that creates a table reflecting the
1908
structure of a DataFrame. The first entry will be a CREATE TABLE
1909
statement while the rest will be CREATE INDEX statements.
1910
"""
1911
column_names_and_types = self._get_column_names_and_types(self._sql_type_name)
1912
escape = _get_valid_sqlite_name
1913
1914
create_tbl_stmts = [
1915
escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types
1916
]
1917
1918
if self.keys is not None and len(self.keys):
1919
if not is_list_like(self.keys):
1920
keys = [self.keys]
1921
else:
1922
keys = self.keys
1923
cnames_br = ", ".join([escape(c) for c in keys])
1924
create_tbl_stmts.append(
1925
f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})"
1926
)
1927
if self.schema:
1928
schema_name = self.schema + "."
1929
else:
1930
schema_name = ""
1931
create_stmts = [
1932
"CREATE TABLE "
1933
+ schema_name
1934
+ escape(self.name)
1935
+ " (\n"
1936
+ ",\n ".join(create_tbl_stmts)
1937
+ "\n)"
1938
]
1939
1940
ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index]
1941
if len(ix_cols):
1942
cnames = "_".join(ix_cols)
1943
cnames_br = ",".join([escape(c) for c in ix_cols])
1944
create_stmts.append(
1945
"CREATE INDEX "
1946
+ escape("ix_" + self.name + "_" + cnames)
1947
+ "ON "
1948
+ escape(self.name)
1949
+ " ("
1950
+ cnames_br
1951
+ ")"
1952
)
1953
1954
return create_stmts
1955
1956
def _sql_type_name(self, col):
1957
dtype: DtypeArg = self.dtype or {}
1958
if is_dict_like(dtype):
1959
dtype = cast(dict, dtype)
1960
if col.name in dtype:
1961
return dtype[col.name]
1962
1963
# Infer type of column, while ignoring missing values.
1964
# Needed for inserting typed data containing NULLs, GH 8778.
1965
col_type = lib.infer_dtype(col, skipna=True)
1966
1967
if col_type == "timedelta64":
1968
warnings.warn(
1969
"the 'timedelta' type is not supported, and will be "
1970
"written as integer values (ns frequency) to the database.",
1971
UserWarning,
1972
stacklevel=find_stack_level(),
1973
)
1974
col_type = "integer"
1975
1976
elif col_type == "datetime64":
1977
col_type = "datetime"
1978
1979
elif col_type == "empty":
1980
col_type = "string"
1981
1982
elif col_type == "complex":
1983
raise ValueError("Complex datatypes not supported")
1984
1985
if col_type not in _SQL_TYPES:
1986
col_type = "string"
1987
1988
return _SQL_TYPES[col_type]
1989
1990
1991
class SQLiteDatabase(PandasSQL):
1992
"""
1993
Version of SQLDatabase to support SQLite connections (fallback without
1994
SQLAlchemy). This should only be used internally.
1995
1996
Parameters
1997
----------
1998
con : sqlite connection object
1999
2000
"""
2001
2002
def __init__(self, con):
2003
self.con = con
2004
2005
@contextmanager
2006
def run_transaction(self):
2007
cur = self.con.cursor()
2008
try:
2009
yield cur
2010
self.con.commit()
2011
except Exception:
2012
self.con.rollback()
2013
raise
2014
finally:
2015
cur.close()
2016
2017
def execute(self, *args, **kwargs):
2018
cur = self.con.cursor()
2019
try:
2020
cur.execute(*args, **kwargs)
2021
return cur
2022
except Exception as exc:
2023
try:
2024
self.con.rollback()
2025
except Exception as inner_exc: # pragma: no cover
2026
ex = DatabaseError(
2027
f"Execution failed on sql: {args[0]}\n{exc}\nunable to rollback"
2028
)
2029
raise ex from inner_exc
2030
2031
ex = DatabaseError(f"Execution failed on sql '{args[0]}': {exc}")
2032
raise ex from exc
2033
2034
@staticmethod
2035
def _query_iterator(
2036
cursor,
2037
chunksize: int,
2038
columns,
2039
index_col=None,
2040
coerce_float: bool = True,
2041
parse_dates=None,
2042
dtype: DtypeArg | None = None,
2043
):
2044
"""Return generator through chunked result set"""
2045
has_read_data = False
2046
while True:
2047
data = cursor.fetchmany(chunksize)
2048
if type(data) == tuple:
2049
data = list(data)
2050
if not data:
2051
cursor.close()
2052
if not has_read_data:
2053
yield DataFrame.from_records(
2054
[], columns=columns, coerce_float=coerce_float
2055
)
2056
break
2057
else:
2058
has_read_data = True
2059
yield _wrap_result(
2060
data,
2061
columns,
2062
index_col=index_col,
2063
coerce_float=coerce_float,
2064
parse_dates=parse_dates,
2065
dtype=dtype,
2066
)
2067
2068
def read_query(
2069
self,
2070
sql,
2071
index_col=None,
2072
coerce_float: bool = True,
2073
params=None,
2074
parse_dates=None,
2075
chunksize: int | None = None,
2076
dtype: DtypeArg | None = None,
2077
):
2078
2079
args = _convert_params(sql, params)
2080
cursor = self.execute(*args)
2081
columns = [col_desc[0] for col_desc in cursor.description]
2082
2083
if chunksize is not None:
2084
return self._query_iterator(
2085
cursor,
2086
chunksize,
2087
columns,
2088
index_col=index_col,
2089
coerce_float=coerce_float,
2090
parse_dates=parse_dates,
2091
dtype=dtype,
2092
)
2093
else:
2094
data = self._fetchall_as_list(cursor)
2095
cursor.close()
2096
2097
frame = _wrap_result(
2098
data,
2099
columns,
2100
index_col=index_col,
2101
coerce_float=coerce_float,
2102
parse_dates=parse_dates,
2103
dtype=dtype,
2104
)
2105
return frame
2106
2107
def _fetchall_as_list(self, cur):
2108
result = cur.fetchall()
2109
if not isinstance(result, list):
2110
result = list(result)
2111
return result
2112
2113
def to_sql(
2114
self,
2115
frame,
2116
name,
2117
if_exists="fail",
2118
index=True,
2119
index_label=None,
2120
schema=None,
2121
chunksize=None,
2122
dtype: DtypeArg | None = None,
2123
method=None,
2124
**kwargs,
2125
) -> int | None:
2126
"""
2127
Write records stored in a DataFrame to a SQL database.
2128
2129
Parameters
2130
----------
2131
frame: DataFrame
2132
name: string
2133
Name of SQL table.
2134
if_exists: {'fail', 'replace', 'append'}, default 'fail'
2135
fail: If table exists, do nothing.
2136
replace: If table exists, drop it, recreate it, and insert data.
2137
append: If table exists, insert data. Create if it does not exist.
2138
index : bool, default True
2139
Write DataFrame index as a column
2140
index_label : string or sequence, default None
2141
Column label for index column(s). If None is given (default) and
2142
`index` is True, then the index names are used.
2143
A sequence should be given if the DataFrame uses MultiIndex.
2144
schema : string, default None
2145
Ignored parameter included for compatibility with SQLAlchemy
2146
version of ``to_sql``.
2147
chunksize : int, default None
2148
If not None, then rows will be written in batches of this
2149
size at a time. If None, all rows will be written at once.
2150
dtype : single type or dict of column name to SQL type, default None
2151
Optional specifying the datatype for columns. The SQL type should
2152
be a string. If all columns are of the same type, one single value
2153
can be used.
2154
method : {None, 'multi', callable}, default None
2155
Controls the SQL insertion clause used:
2156
2157
* None : Uses standard SQL ``INSERT`` clause (one per row).
2158
* 'multi': Pass multiple values in a single ``INSERT`` clause.
2159
* callable with signature ``(pd_table, conn, keys, data_iter)``.
2160
2161
Details and a sample callable implementation can be found in the
2162
section :ref:`insert method <io.sql.method>`.
2163
"""
2164
if dtype:
2165
if not is_dict_like(dtype):
2166
# error: Value expression in dictionary comprehension has incompatible
2167
# type "Union[ExtensionDtype, str, dtype[Any], Type[object],
2168
# Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
2169
# Type[str], Type[float], Type[int], Type[complex], Type[bool],
2170
# Type[object]]]]"; expected type "Union[ExtensionDtype, str,
2171
# dtype[Any], Type[object]]"
2172
dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
2173
else:
2174
dtype = cast(dict, dtype)
2175
2176
for col, my_type in dtype.items():
2177
if not isinstance(my_type, str):
2178
raise ValueError(f"{col} ({my_type}) not a string")
2179
2180
table = SQLiteTable(
2181
name,
2182
self,
2183
frame=frame,
2184
index=index,
2185
if_exists=if_exists,
2186
index_label=index_label,
2187
dtype=dtype,
2188
)
2189
table.create()
2190
return table.insert(chunksize, method)
2191
2192
def has_table(self, name: str, schema: str | None = None):
2193
2194
wld = "?"
2195
query = f"SELECT name FROM sqlite_master WHERE type='table' AND name={wld};"
2196
2197
return len(self.execute(query, [name]).fetchall()) > 0
2198
2199
def get_table(self, table_name: str, schema: str | None = None):
2200
return None # not supported in fallback mode
2201
2202
def drop_table(self, name: str, schema: str | None = None):
2203
drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
2204
self.execute(drop_sql)
2205
2206
def _create_sql_schema(
2207
self,
2208
frame,
2209
table_name: str,
2210
keys=None,
2211
dtype: DtypeArg | None = None,
2212
schema: str | None = None,
2213
):
2214
table = SQLiteTable(
2215
table_name,
2216
self,
2217
frame=frame,
2218
index=False,
2219
keys=keys,
2220
dtype=dtype,
2221
schema=schema,
2222
)
2223
return str(table.sql_schema())
2224
2225
2226
def get_schema(
2227
frame,
2228
name: str,
2229
keys=None,
2230
con=None,
2231
dtype: DtypeArg | None = None,
2232
schema: str | None = None,
2233
):
2234
"""
2235
Get the SQL db table schema for the given frame.
2236
2237
Parameters
2238
----------
2239
frame : DataFrame
2240
name : str
2241
name of SQL table
2242
keys : string or sequence, default: None
2243
columns to use a primary key
2244
con: an open SQL database connection object or a SQLAlchemy connectable
2245
Using SQLAlchemy makes it possible to use any DB supported by that
2246
library, default: None
2247
If a DBAPI2 object, only sqlite3 is supported.
2248
dtype : dict of column name to SQL type, default None
2249
Optional specifying the datatype for columns. The SQL type should
2250
be a SQLAlchemy type, or a string for sqlite3 fallback connection.
2251
schema: str, default: None
2252
Optional specifying the schema to be used in creating the table.
2253
2254
.. versionadded:: 1.2.0
2255
"""
2256
pandas_sql = pandasSQL_builder(con=con)
2257
return pandas_sql._create_sql_schema(
2258
frame, name, keys=keys, dtype=dtype, schema=schema
2259
)
2260
2261