-
Notifications
You must be signed in to change notification settings - Fork 653
Expand file tree
/
Copy pathcron.cpp
More file actions
151 lines (132 loc) · 5.31 KB
/
cron.cpp
File metadata and controls
151 lines (132 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include "server.h"
#include "cron.h"
void freeCronObject(robj_roptr o)
{
delete reinterpret_cast<const cronjob*>(ptrFromObj(o));
}
// CRON [name] [single shot] [optional: start] [delay] [script] [numkeys] [key N] [arg N]
void cronCommand(client *c)
{
int arg_offset = 0;
static const int ARG_NAME = 1;
static const int ARG_SINGLESHOT = 2;
static const int ARG_EXPIRE = 3;
#define ARG_SCRIPT (4+arg_offset)
#define ARG_NUMKEYS (5+arg_offset)
#define ARG_KEYSTART (6+arg_offset)
bool fSingleShot = false;
if (strcasecmp("single", szFromObj(c->argv[ARG_SINGLESHOT])) == 0) {
fSingleShot = true;
} else {
if (strcasecmp("repeat", szFromObj(c->argv[ARG_SINGLESHOT])) != 0) {
addReply(c, shared.syntaxerr);
return;
}
}
long long interval;
if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK)
return;
long long base;
__atomic_load(&g_pserver->mstime, &base, __ATOMIC_ACQUIRE);
if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) {
arg_offset++;
std::swap(base, interval);
}
if (interval <= 0)
{
addReplyError(c, "interval must be positive");
return;
}
long numkeys = 0;
if (c->argc > ARG_NUMKEYS)
{
if (getLongFromObjectOrReply(c, c->argv[ARG_NUMKEYS], &numkeys, NULL) != C_OK)
return;
if (c->argc < (6 + numkeys)) {
addReplyError(c, "Missing arguments or numkeys is too big");
return;
}
}
std::unique_ptr<cronjob> spjob = std::make_unique<cronjob>();
spjob->script = sdsstring(sdsdup(szFromObj(c->argv[ARG_SCRIPT])));
spjob->interval = (uint64_t)interval;
spjob->startTime = (uint64_t)base;
spjob->fSingleShot = fSingleShot;
spjob->dbNum = dbnumFromDb(c->db);
for (long i = 0; i < numkeys; ++i)
spjob->veckeys.emplace_back(sdsdup(szFromObj(c->argv[ARG_KEYSTART+i])));
for (long i = ARG_KEYSTART + numkeys; i < c->argc; ++i)
spjob->vecargs.emplace_back(sdsdup(szFromObj(c->argv[i])));
robj *o = createObject(OBJ_CRON, spjob.release());
setKey(c, c->db, c->argv[ARG_NAME], o);
decrRefCount(o);
// use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it.
setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval);
++g_pserver->dirty;
addReply(c, shared.ok);
}
void executeCronJobExpireHook(const char *key, robj *o)
{
serverAssert(o->type == OBJ_CRON);
cronjob *job = (cronjob*)ptrFromObj(o);
client *cFake = createClient(nullptr, IDX_EVENT_LOOP_MAIN);
cFake->lock.lock();
cFake->authenticated = 1;
cFake->user = nullptr;
selectDb(cFake, job->dbNum);
serverAssert(cFake->argc == 0);
// Setup the args for the EVAL command
cFake->cmd = lookupCommandByCString("EVAL");
cFake->argc = 3 + job->veckeys.size() + job->vecargs.size();
cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL);
cFake->argv[0] = createStringObject("EVAL", 4);
cFake->argv[1] = createStringObject(job->script.get(), job->script.size());
cFake->argv[2] = createStringObjectFromLongLong(job->veckeys.size());
for (size_t i = 0; i < job->veckeys.size(); ++i)
cFake->argv[3+i] = createStringObject(job->veckeys[i].get(), job->veckeys[i].size());
for (size_t i = 0; i < job->vecargs.size(); ++i)
cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size());
int lua_replicate_backup = g_pserver->lua_always_replicate_commands;
g_pserver->lua_always_replicate_commands = 0;
evalCommand(cFake);
g_pserver->lua_always_replicate_commands = lua_replicate_backup;
if (g_pserver->aof_state != AOF_OFF)
feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc);
// Active replicas do their own expiries, do not propogate
if (!g_pserver->fActiveReplica)
replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc);
resetClient(cFake);
robj *keyobj = createStringObject(key,sdslen(key));
int dbId = job->dbNum;
if (job->fSingleShot)
{
serverAssert(dbSyncDelete(cFake->db, keyobj));
}
else
{
job->startTime += job->interval;
mstime_t mstime;
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE);
if (job->startTime < (uint64_t)mstime)
{
// If we are more than one interval in the past then fast forward to
// the first interval still in the future. If startTime wasn't zero align
// this to the original startTime, if it was zero align to now
if (job->startTime == job->interval)
{ // startTime was 0
job->startTime = mstime + job->interval;
}
else
{
auto delta = mstime - job->startTime;
auto multiple = (delta / job->interval)+1;
job->startTime += job->interval * multiple;
}
}
setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime);
}
notifyKeyspaceEvent(NOTIFY_KEYEVENT, "CRON Executed", keyobj, dbId);
decrRefCount(keyobj);
// o is invalid at this point
freeClient(cFake);
}