0%

《lua 程序设计》读书笔记(16):线程和状态

Lua 不支持真正的多线程,即不支持共享内存的抢占式线程。Lua 语言的线程(也就是所谓的协程)是协作式的,因此可以避免因为不可预知的线程切换而带来的问题,而且 Lua 状态之间不共享内存,这也为 Lua 实现并行化提供了很好的基础。

这篇文章将深入介绍 Lua 的线程以及 lua_State 数据结构。

多线程

在 Lua 中,协程的本质就是线程,可以认为协程是带有良好编程接口的线程,也可以认为线程是带有底层 API 的协程。从 C API 的角度来看,把线程当做一个栈会比较有用,一个栈包含了一个线程得以运行所需的所有信息。因此多个线程就意味着多个独立的栈

lua_State 不仅表示一个 Lua 状态,还表示带有该状态的一个线程(因此有人认为该类型应该叫做 lua_Thread)。当创建一个 Lua 状态时,Lua 就会自动用这个状态创建一个主线程,并返回代表该线程的 lua_State。该主线程永远不会被垃圾回收,它只会在调用 lua_close 关闭状态时随着状态一起释放。Lua 的 CAPI 的大多数函数都需要操作栈,而这些函数的第一个参数 lua_State 就指定了是哪个线程的栈。

调用 lua_newthread 可以在一个状态中创建其他的线程,他们在内部引用了相同的 Lua 状态。但是每个线程都有自己的栈,新线程从空栈开始运行,而老线程则在栈顶引用这个新线程:

1
lua_State *lua_newthread(lua_State *L);

除了主线程以外,线程和其他 Lua 对象一样都是垃圾回收的对象。当新建一个线程时,新创建的线程会被压入栈中,这样就保证了新线程不会马上被垃圾收集。永远不要使用未被正确锚定(引用)在 Lua 状态中的线程(主线程是内部锚定的)。

一旦拥有一个新线程,就可以像使用主线程一样来使用它了。可以将元素压入栈中,或者从栈中弹出元素,还可以用它来调用函数等。函数 lua_xmove 可以在同一个 Lua 状态的两个栈之间移动 Lua 值。

**使用多线程的主要目的是实现协程,从而可以挂起某些协程的执行,并在之后恢复运行,因此要用到函数 lua_resume**。要启动一个协程,可以像使用 lua_pcall 一样使用 lua_resume:将待调用函数(协程体)压入栈中,然后压入协程的参数,并以参数数量作为 narg 调用 lau_resume。

如果正在运行的函数被挂起(例如通过 lua_yield),lua_resume 就会返回 LUA_YIELD,并将线程置于一个可以后续再恢复执行的状态中。当 lua_resume 返回 LUA_YIELD 时,线程栈的可见部分只包含传递给 yield 的值。要恢复一个挂起的线程,可以再次调用 lua_resume。在这种调用中,Lua 假设栈中所有值都会被调用的 yield 返回。

通常,会将一个 Lua 函数作为协程体启动线程,这个 Lua 函数可以调用其他 Lua 函数,并且其中任意一个函数都可以挂起,从而结束对 lua_resume 的调用。例如:

1
2
function foo(x) coroutine.yield(10, x) end
function foo1(x) foo(x + 1); return 3 end

而运行的 C 代码如下所示:

1
2
3
4
lua_State *L1 = lua_newthread(L);
lua_getglobal(L1, "foo1");
lua_pushinteger(L1, 20);
lua_resume(L1, L, 1);

一个协程也可以调用 C 语言函数,而 C 函数又可以反过来调用其他 Lua 函数。之前介绍过通过 延续函数 可以让 Lua 函数交出控制权。其实 C 语言函数自己也可以交出控制权,此时它必须提供一个在线程恢复时被调用的延续函数。要交出控制权,C 语言函数需要调用 lua_yieldk

1
int lua_yieldk (lua_State *L, int nresults, lua_KContext ctx, lua_KFunction k);

该调用会挂起正在运行的协程。当协程恢复运行时,控制权会直接交给延续函数 k。如果 C 语言函数在交出控制权之后什么都不做,那么它可以不带延续函数调用 lua_yieldk 或者宏 lua_yield

Lua 状态

每次调用 luaL_newstate(或者 lua_newsate) 都会创建一个新的 Lua 状态。不同的 Lua 状态之间是完全独立的,他们不共享数据。这也意味着 Lua 状态之间不能直接通信,必须借助一些 C 语言代码的帮助。由于所有数据都需要由 C 语言进行传递,因此 Lua 状态之间只能交换能够用 C 语言表示的类型。

在支持多线程的系统中,一种设计模型是为每个线程创建一个独立的 Lua 状态。这种设计使得线程类似于 POSIX 线程,实现了非共享内存的并发。如下是该系统的一个简单原型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#include <pthread.h>
#include "lua.h"
#include "lauxlib.h"


typedef struct Proc {
lua_State *L;
pthread_t thread;
pthread_cond_t cond;
const char *channel;
struct Proc *previous, *next;
} Proc;


static Proc *waitsend = NULL;
static Proc *waitreceive = NULL;
static pthread_mutex_t kernel_access = PTHREAD_MUTEX_INITIALIZER;


static Proc *getself(lua_State *L) {
Proc *p;

lua_getfield(L, LUA_REGISTRYINDEX, "_SELF");
p = (Proc*)lua_touserdata(L, -1);
lua_pop(L, 1);
return p;
}


static void movevalues(lua_State *send, lua_State *rec) {
int n = lua_gettop(send);
int i;

luaL_checkstack(rec, n, "too many results");
for (i = 2; i < n; i++) {
lua_pushstring(rec, lua_tostring(send, i));
}
}


static Proc *searchmatch(const char *channel, Proc **list) {
Proc *node;

for (node = *list; node != NULL; node = node->next) {
if (strcmp(channel, node->channel) == 0) {
if (*list == node) {
*list = (node->next == node) ? NULL : node->next;
}

node->previous->next = node->next;
node->next->previous = node->previous;
return node;
}
}

return NULL;
}


static void waitonlist(lua_State *L, const char *channel, Proc **list) {
Proc *p = getself(L);

if (*list == NULL) {
*list = p;
p->previous = p->next = p;
} else {
p->previous = (*list)->previous;
p->next = *list;
p->previous->next = p->next->previous = p;
}

p->channel = channel;

do {
pthread_cond_wait(&p->cond, &kernel_access);
} while (p->channel);
}


static int ll_send(lua_State *L) {
Proc *p;

const char *channel = luaL_checkstring(L, 1);

pthread_mutex_lock(&kernel_access);

p = searchmatch(channel, &waitreceive);
if (p) {
movevalues(L, p->L);
p->channel = NULL;
pthread_cond_signal(&p->cond);
} else {
waitonlist(L, channel, &waitsend);
}

pthread_mutex_unlock(&kernel_access);
return 0;
}


static int ll_receive(lua_State *L) {
Proc *p;

const char *channel = luaL_checkstring(L, 1);
lua_settop(L, 1);

pthread_mutex_lock(&kernel_access);

p = searchmatch(channel, &waitsend);
if (p) {
movevalues(p->L, L);
p->channel = NULL;
pthread_cond_signal(&p->cond);
} else {
waitonlist(L, channel, &waitreceive);
}

pthread_mutex_unlock(&kernel_access);

return lua_gettop(L) - 1;
}

static void registerlib(lua_State *L, const char *name, lua_CFunction f) {
lua_getglobal(L, "package");
lua_getfield(L, -1, "preload");
lua_pushcfunction(L, f);
lua_setfield(L, -2, name);
lua_pop(L, 2);
}

static void openlibs(lua_State *L) {
luaL_requiref(L, "_G", luaopen_base, 1);
luaL_requiref(L, "package", luaopen_package, 1);
lua_pop(L, 2);
registerlib(L, "coroutine", luaopen_coroutine);
registerlib(L, "table", luaopen_table);
registerlib(L, "io", luaopen_io);
registerlib(L, "os", luaopen_os);
registerlib(L, "string", luaopen_string);
registerlib(L, "math", luaopen_math);
registerlib(L, "utf8", luaopen_utf8);
registerlib(L, "debug", luaopen_debug);
}

int luaopen_lproc(lua_State *L);

static void *ll_thread(void *arg) {
lua_State *L = (lua_State *) arg;
Proc *self;

openlibs(L);
luaL_requiref(L, "lproc", luaopen_lproc, 1);
lua_pop(L, 1);

self = (Proc *)lua_newuserdata(L, sizeof(Proc));
lua_setfield(L, LUA_REGISTRYINDEX, "_SELF");
self->L = L;
self->thread = pthread_self();
self->channel = NULL;
pthread_cond_init(&self->cond, NULL);

if (lua_pcall(L, 0, 0, 0) != 0) {
fprintf(stderr, "thread error: %s", lua_tostring(L, -1));
}

pthread_cond_destroy(&getself(L)->cond);
return NULL;
}

static int ll_start(lua_State *L) {
pthread_t thread;

const char *chunk = luaL_checkstring(L, 1);
lua_State *L1 = luaL_newstate();

if (L1 == NULL) {
luaL_error(L, "unable to create new state");
}

if (luaL_loadstring(L1, chunk) != 0) {
luaL_error(L, "error in thread body: %s", lua_tostring(L1, -1));
}

if (pthread_create(&thread, NULL, ll_thread, L1) != 0) {
luaL_error(L, "unable to create new thread");
}

pthread_detach(thread);
return 0;
}


static int ll_exit(lua_State *L) {
pthread_exit(NULL);
return 0;
}


static const struct luaL_Reg ll_funcs[] = {
{"start", ll_start},
{"send", ll_send},
{"receive", ll_receive},
{"exit", ll_exit},
{NULL, NULL}
};


int luaopen_lproc(lua_State *L) {
luaL_newlib(L, ll_funcs);
return 1;
}