0%

Django 操作阿里云表格存储 Tablestore

Tablestore 的配置

首先,开通阿里云表格存储服务。连接:https://www.aliyun.com/product/ots?spm=a2c4g.11186623.2.7.6f0b23a5RBru3P

开通好之后,创建一个实例:

在开始之前,还要创建 Access Key。

Access Key 的创建参见:https://help.aliyun.com/document_detail/53045.html?spm=a2c4g.11186623.2.12.7ed845d1l6vPRz

需要注意的是,AccessKeySecret 只在创建时显示,不要一出来就手滑关闭。等稍后,我们把它记录下来只会,再关不迟。而且不要忘了把表格存储 AliyunOTSFullAccess 加入到新创建的用户的权限中。

Python 操作 TableStore 的 SDK:https://github.com/aliyun/aliyun-tablestore-python-sdk

阿里云官方 Python SDK:https://help.aliyun.com/document_detail/31723.html?spm=a2c4g.11186623.6.891.563c3d76sdVMpI

首先安装 Tablestore 的 Python 模块:

1
pip install tablestore

settings/dev.py,添加 TableStore 的 API 接口配置:

1
2
3
4
5
# tablestore
OTS_ID = "LTAI4FxkXpCaQwsNTMzxGmMk" # AccessKeyId
OTS_SECRET = "AxS8Y0m3US8prS8XJuqkHDpd2XGRii" # AccessKeySecret
OTS_INSTANCE = "renrantablestore" # 实例名称
OTS_ENDPOINT = "https://renranzixun.cn-hangzhou.ots.aliyuncs.com" # 实例访问公网地址

Tablestore 目前只支持四种数据类型:INTEGER、STRING、DOUBLE 和 BOOLEAN。其中 DOUBLE 类型不能做主键类型,BOOLEAN 不可以做主键的第一列(分区键)。

为了方便演示,我们另外创建一个单独的子应用 store 来编写 tablestore 的代码:

1
2
cd renranapi/apps
python ../../manage.py startapp store

settings/dev.py 注册:

1
2
3
4
INSALL_APPS = [
...
'store', # 用于演示tableStore,后续删除掉即可
]

表操作

参见:创建表

Tablestore 里面创建表的时候必须设置表名,主键列,还有表元信息和表的描述项(有效期,版本,吞吐量)。

创建表的时候,除了主键列以外,还可以设置预设字段列。这个不常用,因为当前使用 tablestore 是 NOSQL 数据,所以我们表结构的字段列可以在添加数据的时候再指定。

视图代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import tablestore as ts
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

class TableStoreAPIView(APIView):
@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def post(self, request):
"""创建表"""
# 设置主键和字段
table_name = 'user_message_table' # 如果表名已存在,会报错
# schema_of_primary_key = [
# ('字段名', '字段类型', ts.PK_AUTO_INCR),
# ('uid', 'STRING')
# ]
# 字段类型包括:INTEGER、STRING、DOUBLE 和 BOOLEAN
# 主键列
schema_of_primary_key = [
('user_id', 'INTEGER'),
('sequence_id', 'INTEGER', ts.PK_AUTO_INCR),
('sender_id', 'INTEGER'),
('message_id', 'INTEGER'),
]
# 设置表的元信息
table_meta = ts.TableMeta(table_name, schema_of_primary_key)
# 设置数据的有效期,数据保留7*68400秒,超过后自动删除;最大5个版本;
table_options = ts.TableOptions(7 * 68400, 5)
# 设置数据的预留读写吞吐量
reserved_throughput = ts.ReservedThroughput(ts.CapacityUnit(0, 0))
# 创建数据
self.client.create_table(table_meta, table_options, reserved_throughput)
return Response({'message': 'ok'})

def delete(self, request):
"""删除表"""
table_name = 'user_message_table'
self.client.delete_table(table_name)
return Response({'message': 'ok'}, status=status.HTTP_204_NO_CONTENT)

def get(self, request):
"""列出所有的表"""
# ["user_message_table","user_message_table1","user_message_table2"]
table_list = self.client.list_table()
return Response(table_list)

注意:

  1. 通过 postman 测试各个接口。
  2. 创建表后需要等待 1 分钟进行加载,在此期间对该表的读/写数据操作有可能会失败。
    应用程序应该等待表加载完毕后再进行数据操作。
  3. 创建表格存储的表时必须指定表的主键。
    主键包含 1~4 个主键列,每一个主键列都有名字和类型。

路由:

1
2
3
4
5
6
7
8
9
10
11
12
from django.urls import path
from . import views

urlpatterns = [
path('table/', views.TableStoreAPIView.as_view())
]

# 总路由
urlpatterns = [
...
path(r'ots/', include('tstore.urls')),
]

一条数据的操作

参见:单行数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class RowAPIView(APIView):
@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def post(self, request):
"""添加一条数据到表格中"""
table_name = 'user_message_table'
# 新增数据的主键列
primary_key = [
# ('主键名', 值),
('user_id', 3), # 接收Feed的用户ID
('sequence_id', ts.PK_AUTO_INCR), # 如果是自增主键,则值为 ts.PK_AUTO_INCR
('sender_id', 1), # 发布Feed的用户ID
('message_id', 4), # 文章ID
]
# 属性列
attribute_columns = [
('receive_time', datetime.now().timestamp()),
('read_status', False),
]
row = ts.Row(primary_key, attribute_columns)
consumed, return_row = self.client.put_row(table_name, row)
return Response({'return_row': return_row})

def get(self, request):
table_name = 'user_message_table'
primary_key = [
('user_id', 3),
('sequence_id', 1586614638036000),
('sender_id', 1),
('message_id', 4),
]
# 需要返回的属性列:。如果columns_to_get为[],则返回所有属性列。
columns_to_get = []
# columns_to_get = ['recevice_time', 'read_status', 'age', 'sex']

consumed, return_row, next_token = self.client.get_row(table_name, primary_key, columns_to_get)

return Response({
'primary_key': return_row.primary_key, # [["user_id",3],["sequence_id",1586614638036000],["sender_id",1],["message_id",4]]
'attribute_columns': return_row.attribute_columns, # [["read_status",false,1586614638036],["receive_time",1586614637.572764,1586614638036]]
'next_toke': next_token,
})

路由,代码:

1
2
3
4
urlpatterns = [
...
path('row/', views.RowAPIView.as_view()),
]

多条数据的操作

参见:多行数据操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from datetime import datetime

import tablestore as ts
from django.conf import settings
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView


class TableStoreAPIView(APIView):
@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def post(self, request):
"""创建表"""
# 设置主键和字段
table_name = 'user_message_table' # 如果表名已存在,会报错
# schema_of_primary_key = [
# ('字段名', '字段类型', ts.PK_AUTO_INCR),
# ('uid', 'STRING')
# ]
# 字段类型包括:INTEGER、STRING、DOUBLE 和 BOOLEAN
# 主键列
schema_of_primary_key = [
('user_id', 'INTEGER'),
('sequence_id', 'INTEGER', ts.PK_AUTO_INCR),
('sender_id', 'INTEGER'),
('message_id', 'INTEGER'),
]
# 设置表的元信息
table_meta = ts.TableMeta(table_name, schema_of_primary_key)
# 设置数据的有效期,数据保留7*68400秒,超过后自动删除;最大5个版本;
table_options = ts.TableOptions(7 * 68400, 5)
# 设置数据的预留读写吞吐量
reserved_throughput = ts.ReservedThroughput(ts.CapacityUnit(0, 0))
# 创建数据
self.client.create_table(table_meta, table_options, reserved_throughput)
return Response({'message': 'ok'})

def delete(self, request):
"""删除表"""
table_name = 'user_message_table'
self.client.delete_table(table_name)
return Response({'message': 'ok'}, status=status.HTTP_204_NO_CONTENT)

def get(self, request):
"""列出所有的表"""
# ["user_message_table","user_message_table1","user_message_table2"]
table_list = self.client.list_table()
return Response(table_list)

class RowAPIView(APIView):
@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def post(self, request):
"""添加一条数据到表格中"""
table_name = 'user_message_table'
# 新增数据的主键列
primary_key = [
# ('主键名', 值),
('user_id', 3), # 接收Feed的用户ID
('sequence_id', ts.PK_AUTO_INCR), # 如果是自增主键,则值为 ts.PK_AUTO_INCR
('sender_id', 1), # 发布Feed的用户ID
('message_id', 4), # 文章ID
]
# 属性列
attribute_columns = [
('receive_time', datetime.now().timestamp()),
('read_status', False),
]
row = ts.Row(primary_key, attribute_columns)
consumed, return_row = self.client.put_row(table_name, row)
return Response({'return_row': return_row})

def get(self, request):
table_name = 'user_message_table'
primary_key = [
('user_id', 3),
('sequence_id', 1586614638036000),
('sender_id', 1),
('message_id', 4),
]
# 需要返回的属性列:。如果columns_to_get为[],则返回所有属性列。
columns_to_get = []
# columns_to_get = ['recevice_time', 'read_status', 'age', 'sex']

consumed, return_row, next_token = self.client.get_row(table_name, primary_key, columns_to_get)

return Response({
'primary_key': return_row.primary_key, # [["user_id",3],["sequence_id",1586614638036000],["sender_id",1],["message_id",4]]
'attribute_columns': return_row.attribute_columns, # [["read_status",false,1586614638036],["receive_time",1586614637.572764,1586614638036]]
'next_toke': next_token,
})

class BatchRowAPIView(APIView):
@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def post(self, request):
"""添加多条数据"""
table_name = 'user_message_table'
put_row_items = []
for i in range(10):
primary_key = [
('user_id', i),
('sequence_id', ts.PK_AUTO_INCR),
('sender_id', i * 2),
('message_id', i + 5),
]
attribute_columns = [
('receive_time', datetime.now().timestamp()),
('read_status', False),
]
row = ts.Row(primary_key, attribute_columns)
condition = ts.Condition(ts.RowExistenceExpectation.IGNORE)
item = ts.PutRowItem(row, condition)
put_row_items.append(item)
req = ts.BatchWriteRowRequest()
req.add(ts.TableInBatchWriteRowItem(table_name, put_row_items))
result = self.client.batch_write_row(req)
return Response(result.is_all_succeed()) # True
def get(self, request):
"""按范围获取多行数据"""
table_name = 'user_message_table'
# 范围查询的起始主键
inclusive_start_primary_key = [
('user_id', ts.INF_MIN),
('sequence_id', ts.INF_MIN), # ts.INF_MIN指代最小的值
('sender_id', ts.INF_MIN),
('message_id', ts.INF_MIN),
]
# 范围查询的结束主键
exclusive_end_primary_key = [
('user_id', ts.INF_MAX),
('sequence_id', ts.INF_MAX), # ts.INF_MAX指代最大值
('sender_id', ts.INF_MAX),
('message_id', ts.INF_MAX),
]
# 查询所有列
# columns_to_get = [] # []表示返回所有列
columns_to_get = ['user_id', 'sequence_id'] # []表示返回所有列
limit = 100

# 设置多条件
cond = ts.CompositeColumnCondition(ts.LogicalOperator.AND) # 逻辑条件
# cond = ts.CompositeColumnCondition(ts.LogicalOperator.OR)
# cond = ts.CompositeColumnCondition(ts.LogicalOperator.NOT)

# 多条件下的子条件
# 比较运算符:EQUAL,NOT_EQUAL,GREATER_THAN,GREATER_EQUAL,LESS_THAN,LESS_EQUAL
cond.add_sub_condition(ts.SingleColumnCondition('user_id', 3, ts.ComparatorType.GREATER_THAN))
cond.add_sub_condition(ts.SingleColumnCondition('user_id', 5, ts.ComparatorType.LESS_EQUAL))
cond.add_sub_condition(ts.SingleColumnCondition('sender_id', 8, ts.ComparatorType.EQUAL))

consumed, next_start_primary_key, row_list, next_token = self.client.get_range(
table_name, # 操作表名
ts.Direction.FORWARD, # 范围的方向,字符串格式,取值包括'FORWARD'和'BACKWARD'
inclusive_start_primary_key, # 起始值
exclusive_end_primary_key, # 终止值
columns_to_get, # 返回字段列
limit, # 结果数量
column_filter=cond, # 过滤条件
max_version=1 # 返回版本数量
)
for row in row_list:
print(row.primary_key, row.attribute_columns)
return Response({
'primary_keys': [row.primary_key for row in row_list],
})

多行数据操作路由代码:

1
2
3
4
urlpatterns = [
...
path('batch/', views.BatchRowAPIView.as_view()),
]

Django 自定义终端命令创建和删除表格

在子应用 home 目录下创建 management 包,并在 management 包中创建命令包目录 commands,commands 下面就可以创建命令模块文件了。【注意,app 子应用必须注册到 INSTALL_APPS 应用列表中】

add_argument() 常用的参数:

  • default:设置参数的默认值

  • version:打印程序版本信息

  • nargs:指定选项后接受参数的个数,如果指定这个参数,值将以列表形式存放,即便只有一个元素也是如此。使用命令时,参数个数必须要和指定的数目保持一致,如果指定为 '*',则可接受任意数量参数

  • help:参数命令的介绍

在 commands 包下面创建命令文件,并在文件中声明命令类,例如:tablestore.py,代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from django.core.management import BaseCommand

class Command(BaseCommand):
help = """测试命令的帮助文档"""

def add_arguments(self, parser):
parser.add_argument('argument', nargs='*', help='必填参数的说明') # 位置参数
parser.add_argument('--option', '-o', default=None, help='可选参数的说明') # 选项参数

def handle(self, *args, **options):
"""命令主方法
options: 参数列表
"""
argument = options.get('argument') # 获取位置参数
option = options.get('option') # 获取选项参数

self.stdout.write(f'argument: {argument}')
self.stdout.write(f'option: {option}')

if option == None:
self.stdout.write('没有设置option选项参数')

终端进入后台 renranapi 根目录,数据命令即可测试:

1
2
3
python manage.py tablestore add
python manage.py tablestore add --option del
python manage.py tablestore add -o del

注意:

  1. 命令类必须继承于 django.core.management.BaseCommand,并且类名必须叫 Command。
  2. 命令名称就是文件名,例如,命令文件叫 tablestore.py,则终端下调用命令为: python manage.py tablestore
  3. 命令参数左边加上 --,则表示可选参数,可选参数建议设置默认值,方便在 handle 方法中判断进行默认处理。如果不设置,默认值默认为 None。
  4. 命令参数如果没有 --,则表示位置参数,则调用命令时,必须为当前命令传递参数,否则报错!

接下来我们直接在项目中提供 TableStore 的操作用于创建 Feed 系统的表结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import tablestore as ts
from django.core.management import BaseCommand
from django.conf import settings

class Command(BaseCommand):
help = """表格存储命令必须接收而且只接收1个命令参数,可选参数如下:
create 表示创建项目使用的表格
delete 表示删除项目中的所有表格(慎用!)
"""
def add_arguments(self, parser):
parser.add_argument('argument', nargs='*', help='操作类型')

def handle(self, *args, **options):
"""表格存储的初始化"""
argument = options.get('argument')
if len(argument) == 1:
if argument[0] == 'create':
"""创建项目所需要的表格"""
self.create_table()
elif argument[0] == 'delete':
"""删除项目中所有表格"""
self.delete_table()
else:
self.stdout.write(self.help)
else:
self.stdout.write(self.help)

@property
def client(self):
return ts.OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE)

def set_table(self, table_name, schema_of_primary_key, time_to_live=-1):
# 设置表的元信息
table_meta = ts.TableMeta(table_name, schema_of_primary_key)
# 设置数据的有效性
table_options = ts.TableOptions(time_to_live, max_version=5)
# 设置数据的预留读写吞吐量
reserved_throughput = ts.ReservedThroughput(ts.CapacityUnit(0, 0))
# 创建数据
self.client.create_table(table_meta, table_options, reserved_throughput)

def create_table(self):
"""建立项目所需的三个数据表:存储库、关系库和未读池"""
# 创建存储库
table_name = 'user_message_table'
schema_of_primary_key =[
('user_id', 'INTEGER'),
('sequence_id', 'INTEGER', ts.PK_AUTO_INCR),
('sender_id', 'INTEGER'),
('message_id', 'INTEGER'),
]
self.set_table(table_name, schema_of_primary_key, time_to_live=7 * 86400)
self.stdout.write(f'表格 {table_name} 创建完成')

# 创建关系库
table_name = 'user_relation_table'
schema_of_primary_key = [
('user_id', 'INTEGER'),
('follower_id', 'INTEGER'),
]
self.set_table(table_name, schema_of_primary_key, time_to_live=7 * 86400)
self.stdout.write(f'表格 {table_name} 创建完成')

# 创建未读池
table_name = 'user_message_session_table'
schema_of_primary_key = [
('user_id', 'INTEGER'),
('last_sequence_id', 'INTEGER'),
]
self.set_table(table_name, schema_of_primary_key, time_to_live=7 * 86400)
self.stdout.write(f'表格 {table_name} 创建完成')

def delete_table(self):
"""删除所有已存在的表"""
table_list = self.client.list_table()
for table in table_list:
self.client.delete_table(table)
self.stdout.write(f'表格 {table} 删除成功!')

以后如果我们就可以直接使用命令创建和删除项目所需要的表格了:

1
2
python manage.py tablestore delete    # 删除所有已存在的表格
python manage.py tablestore create # 创建项目所需的表格