当程序运行时,函数的局部变量是在线程的栈上进行分配,虽然线程共享进程的虚拟地址空间,但因为每个线程有自己的线程栈,所以栈中的数据是互相隔离的,互不侵扰;而全局变量在heap上进行分配,heap在各个线程间是共享的,所以在对共享的资源进行读写时,需要有同步机制来确保线程安全;然而有一种多线程下的编程方式,可以使得全局变量或静态变量只对单个线程可见,而对其它线程不可见,这就是Thread Local Storage,又叫线程本地存储线程局部存储

Thread Local Storage

维基百科上对Thread Local Storage的解释如下:

Thread-local storage (TLS) is a computer programming method that uses static or global memory local to a thread.

翻译下来就是:线程本地存储(TLS),对于线程来讲是一种对本地化使用静态或全局内存的计算机编程方法。

线程局部存储(TLS)是一个后来者, 产生于多线程概念之后.而在软件发展的早期, 全局变量经常用在库函数中, 用于存储全局信息, 比如errno, 多线程程序产生之后, 全局变量errno就成为所有线程都共享的一个变量, 而实际上, 每个线程都想维护一份自己的errno, 隔离于其他线程.这个时候, 没人愿意去修改库函数的接口. 于是线程局部存储就诞生了, 它主要是为了避免多个线程同时访存同一全局变量或者静态变量时所导致的冲突,尤其是多个线程同时需要修改这一变量时,而这些变量逻辑上又可以在各个线程中独立,也就是说线程并不共享这些变量。

为了解决这个问题,我们可以通过TLS机制,为每一个使用该全局变量的线程都提供一个变量值的副本,每一个线程均可以独立地改变自己的副本,而不会和其它线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量。而从全局变量的角度上来看,就好像一个全局变量被克隆成了多份副本,而每一份副本都可以被一个线程独立地改变。

TLS简单使用

C语言实现

编写如下一段程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#define _MULTI_THREADED

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

__thread int TLS_data1;
__thread int TLS_data2;

//int TLS_data1;
//int TLS_data2;

#define NUMTHREADS 4


void *theThread(void *a) {
int arg = *(int *)a;
printf("Thread %lu before change: arg:%d TLS data=%d %d\n",
pthread_self(), arg, TLS_data1, TLS_data2);
TLS_data1 = arg;
TLS_data2 = arg +1;
printf("Thread %lu after change: arg:%d TLS data=%d %d\n",
pthread_self(), arg, TLS_data1, TLS_data2);
return NULL;
}


int main(int argc, char **argv) {
pthread_t thread[NUMTHREADS];
int rc = 0;
int i;

int ar[NUMTHREADS];

printf("Enter Testcase - %s\n", argv[0]);

printf("Create/start threads\n");
for (i = 0; i < NUMTHREADS; i++) {
/* Create per-thread TLS data and pass it to the thread */
ar[i] = i;
rc = pthread_create(&thread[i], NULL, theThread, &ar[i]);
}

printf("Wait for the threads to complete, and release their resources\n");
for (i = 0; i < NUMTHREADS; i++) {
rc = pthread_join(thread[i], NULL);
}
printf("Main completed\n");
return 0;
}

该段程序使用__thread声明了两个变量TLS_data1TLS_data2为线程局部存储,然后分别在4个线程中修改他们的值,观察运行结果:

1
2
3
4
5
6
7
8
9
10
11
Create/start threads
Thread 139919563978496 before change: arg:0 TLS data=0 0
Thread 139919563978496 after change: arg:0 TLS data=0 1
Thread 139919547193088 before change: arg:2 TLS data=0 0
Thread 139919547193088 after change: arg:2 TLS data=2 3
Wait for the threads to complete, and release their resources
Thread 139919538800384 before change: arg:3 TLS data=0 0
Thread 139919538800384 after change: arg:3 TLS data=3 4
Thread 139919555585792 before change: arg:1 TLS data=0 0
Thread 139919555585792 after change: arg:1 TLS data=1 2
Main completed

可以看到每个线程可以从容的修改他们。并且相互之间没有造成干扰,那么我们去掉__thread而使用普通的全局变量的话,就会使这段程序变得线程不安全:

1
2
3
4
5
6
7
8
9
10
11
Create/start threads
Thread 140450580477696 before change: arg:0 TLS data=0 0
Thread 140450580477696 after change: arg:0 TLS data=0 1
Thread 140450572084992 before change: arg:1 TLS data=0 1
Thread 140450572084992 after change: arg:1 TLS data=1 2
Thread 140450563692288 before change: arg:2 TLS data=1 2
Thread 140450563692288 after change: arg:2 TLS data=2 3
Wait for the threads to complete, and release their resources
Thread 140450555299584 before change: arg:3 TLS data=2 3
Thread 140450555299584 after change: arg:3 TLS data=3 4
Main completed

可以试着删除__thread关键字,再编译运行观察,你会看到一个错乱的运行结果。

python实现

把上面的例子用python来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

local = threading.local()
local.TLS_data1 = 0
local.TLS_data2 = 0


def func(info):
myname = threading.currentThread().getName()
local.TLS_data1 = info
local.TLS_data2 = info + 1
print('Thread {0} after change TLS data: {1}, {2}'.format(myname, local.TLS_data1, local.TLS_data2))


t = [0, 1, 2, 3]
for i in t:
t[i] = threading.Thread(target=func, args=[i])
t[i].start()


for i, v in enumerate(t):
v.join()


print('Thread {0} TLS data: {1}, {2}'.format("main", local.TLS_data1, local.TLS_data2))

执行结果:

1
2
3
4
5
Thread Thread-1 after change TLS data: 0, 1
Thread Thread-2 after change TLS data: 1, 2
Thread Thread-3 after change TLS data: 2, 3
Thread Thread-4 after change TLS data: 3, 4
Thread main TLS data: 0, 0

TLS的误区

网上有很多文章误将TLS当成是编写线程安全代码的银弹,其实哪里是这样,这都取决于global variable在你的线程之间是不是shared,如果你的本意就是共享,那么TLS反而使你南辕北辙,你仍然需要mutex之类的锁去同步你的操作,那么TLS的本质到底是什么?

我认为TLS的本质就是填补了全局变量和局部变量之间的空白,它不像全局变量那样在多个线程中可见,也不像局部变量那样仅仅生存在在函数的作用域之内,它的可见度,大于局部变量,又小于全局变量。

TLS适用场景

综上所述,线程本地存储并不是解决多线程变量共享的并发问题,而是限制变量仅在当前线程中可见,可想而知这样带来的好处之一就是线程内各个方法之间不用再通过传参就可以共享变量;另外一个可想而知的使用场景就是可以实现每个线程需要单独拥有一个实例的情况。

还有一个也是wiki上提到的,就是对一个global variable进行累加的情况,为了避免race condition的传统做法是使用mutex,但也可以使用TLS先在每个线程本地累加,然后再讲每个线程的累加结果同步到一个真正的global variable之上。

当然,我认为TLS的适用场景肯定远不止这些,只是我个人平时工作当中,编码并不是很多,其中多线程编程便又少了一些,而在多线程编程中适用TLS的情况更是为零,本篇文章仅当做学习过程中的记录,以后有更深层次的思考会随时补充,也希望大家可以共同探讨。

参考文章:

  1. Thread-local storage
  2. 线程局部存储漫谈
  3. A Deep dive into (implicit) Thread Local Storage

Docker的Daemon程序绑定到socket文件上(/var/run/docker.sock),而不是tcp端口.因此,默认情况下这个socket文件只能被root用户或者拥有sudo权限的用户访问. Docker daemon总是以root用户运行。

如果你不想总是在docker命令的前边加上sudo,那么可以创建一个名为docker的group,并且将你的用户加入到该组,那么docker daemon启动的时候会创建一个docker组成员可以访问的socket,例如:

1
2
ll /var/run/docker.sock 
srw-rw---- 1 root docker 0 Sep 24 11:24 /var/run/docker.sock

To create the docker group and add your user:

  1. Create the docker group.

    1
    $ sudo groupadd docker
  2. Add your user to the docker group.

    1
    $ sudo usermod -aG docker $USER
  3. Log out and log back in so that your group membership is re-evaluated.
    If testing on a virtual machine, it may be necessary to restart the virtual machine for changes to take effect.
    On a desktop Linux environment such as X Windows, log out of your session completely and then log back in.
    On Linux, you can also run the following command to activate the changes to groups:

    1
    $ newgrp docker

前几天go1.13发布,modules默认开启,从此modules转正成为golang官方原生的包依赖管理方式;除了modules,go1.13中还增加了新的语法,如二进制、八进制、十六进制字面量表示法,defer性能的增强,新的errors等等,社区已有很多相关特性的论述文章;此文仅简单讨论一下go1.13中modules的一些改变,毕竟包的管理跟我们日常开发是息息相关的,行文仓促,若有不当之处,希望读者斧正。

本文仅介绍modules在1.11/1.12/1.13等版本中的变化,并不介绍modules的使用,如需了解modules的详细使用方法,请参考官方文档或其他社区文章

Go Moudles

modules 是 go1.11 推出的特性,官方称是 GOPATH 的替代品,是一个完整的支持包分发和版本控制的工具,使用modules,工作区不再局限于GOPATH之内,从而使构建更加可靠和可重复,但modules在go1.11版本中仅仅是一个实验性的功能,紧接着在go1.12中得到了增强,而刚刚发布的go1.13中得到了转正,GOPATH的作用进一步被弱化,Go Moudles开始大规模使用。

两个模式

对于 modules 这种模式官网有一个称呼是Module-aware,我不知道如何去翻译这个组合词,与之相对的,就是在Module-aware mode之前我们使用的包管理方式称为GOPATH mode,他们的区别如下:

  • GOPATH mode: go command从vendorGOPATH下寻找依赖,依赖会被下载至GOPATH/src

  • Module-aware mode: go command不再考虑GOPATH,仅仅使用GOPATH/pkg/mod存储下载的依赖,并且是多版本并存

注意:Module-aware开启和关闭的情况下,go get 的使用方式不是完全相同的。在 modules 模式开启的情况下,可以通过在 package 后面添加 @version 来表明要升级(降级)到某个版本。如果没有指明 version 的情况下,则默认先下载打了 tag 的 release 版本,比如 v0.4.5 或者 v1.2.3;如果没有 release 版本,则下载最新的 pre release 版本,比如 v0.0.1-pre1。如果还没有则下载最新的 commit。这个地方给我们的一个启示是如果我们不按规范的方式来命名我们的 package 的 tag,则 modules 是无法管理的。version 的格式为 v(major).(minor).(patch) ,

在 modules 开启的模式下,go get 还支持 version 模糊查询,比如 > v1.0.0 表示大于 v1.0.0 的可使用版本;< v1.12.0 表示小于 v1.12.0 版本下最近可用的版本。version 的比较规则按照 version 的各个字段来展开。

除了指定版本,我们还可以使用如下命名使用最近的可行的版本:

  • go get -u 使用最新的 minor 或者 patch 版本

  • go get -u=patch 使用最新的 patch 版本

GO111MODULE

在 1.12 版本之前,使用 Go modules 之前需要环境变量 GO111MODULE:

  • GO111MODULE=off: 不使用 Module-aware mode。

  • GO111MODULE=on: 使用 Module-aware mode,不会去 GOPATH 下面查找依赖包。

  • GO111MODULE=auto或unset: Golang 自己检测是不是使用Module-aware mode。

go1.11时GO111MODULE=on有一个很不好的体验,就是go command依赖go.mod文件,也就是如果在module文件夹外使用go get等命令会报如下错误:

1
2
3
4
5
[root@k8s-node1 ~]# go get github.com/google/go-cmp
go: cannot find main module; see 'go help modules'
[root@k8s-node1 ~]# touch go.mod
[root@k8s-node1 ~]# go get github.com/google/go-cmp
go: cannot determine module path for source directory /root (outside GOPATH, no import comments)

这个情况在go1.12中得到了解决,可以在module directory之外使用go command。

但我个人比较喜欢不去设置GO111MODULE,根据官方描述在不设置GO111MODULE的情况下或者设为auto的时候,如果在当前目录或者父目录中有go.mod文件,那么就使用Module-aware mode, 而go1.12中,如果包位于GOPATH/src下,且GO111MODULE=auto, 即使有go.mod的存在,go仍然使用GOPATH mode:

1
2
3
4
[root@VM_0_6_centos test]# go get github.com/jlaffaye/ftp        
go get: warning: modules disabled by GO111MODULE=auto in GOPATH/src;
ignoring go.mod;
see 'go help modules'

这个现象在go1.13中又发生了改变.

modules in go1.13

GO111MODULE

The GO111MODULE environment variable continues to default to auto, but the auto setting now activates the module-aware mode of the go command whenever the current working directory contains, or is below a directory containing, a go.mod file — even if the current directory is within GOPATH/src.

Go 1.13 includes support for Go modules. Module-aware mode is active by default whenever a go.mod file is found in, or in a parent of, the current directory.

可见,modules 在 Go 1.13 的版本下是默认开启的,GOPATH的地位进一步被弱化。

GOPROXY

GOPROXY环境变量是伴随着modules而生的,在go1.13中得到了增强,可以设置为逗号分隔的url列表来指定多个代理,其默认值为https://proxy.golang.org,direct,也就是说google为我们维护了一个代理服务器,但是因为墙的存在,这个默认设置对中国的gopher并无卵用,应第一时间修改。

go命令在需要下载库包的时候将逐个试用设置中的各个代理,直到发现一个可用的为止。direct表示直连,所有direct之后的proxy都不会被使用,一个设置例子:

1
GOPROXY=https://proxy.golang.org,https://myproxy.mysite:8888,direct

GOPROXY环境变量可以帮助我们下载墙外的第三方库包,比较知名的中国区代理goproxy.cn。当然,通过设置https_proxy环境变量设也可以达到此目的。但是一个公司通过在内部架设一个自己的goproxy服务器来缓存第三方库包,库包下载速度会更快,可以感觉到module有一点maven的意思了,但是易用性上还有很长的路要走。

GOPRIVATE

使用GOPROXY可以获取公共的包,这些包在获取的时候会去https://sum.golang.org进行校验,这对中国的gopher来说又是一个比较坑的地方,Go为了安全性推出了Go checksum database(sumdb),环境变量为GOSUMDB,go命令将在必要的时候连接此服务来检查下载的第三方依赖包的哈希是否和sumdb的记录相匹配。很遗憾,在中国也被墙了,可以选择设置为一个第三方的校验库,也可更直接点将GOSUMDB设为off关闭哈希校验,当然就不是很安全了。

除了public的包,在现实开发中我们更多的是使用很多private的包,因此就不适合走代理,所以go1.13推出了一个新的环境变量GOPRIVATE,它的值是一个以逗号分隔的列表,支持正则(正则语法遵守 Golang 的 包 path.Match)。在GOPRIVATE中设置的包不走proxy和checksum database,例如:

1
GOPRIVATE=*.corp.example.com,rsc.io/private

GONOSUMDB 和 GONOPROXY

这两个环境变量根据字面意思就能明白是设置不进行校验和不走代理的包,设置方法也是以逗号分隔

go env -w

可能是go也觉得环境变量有点多了,干脆为go env增加了一个选项-w,来设置全局环境变量,在Linux系统上我们可以这样用:

1
2
3
go env -w GOPROXY=https://goproxy.cn,direct
go env -w GOPRIVATE=*.gitlab.com,*.gitee.com
go env -w GOSUMDB=off

总结

1.13中包管理的改变来看,有些乏善可陈,go的包管理很难让大多数开发者满意,当我看到越来越多的环境变量时,心里忍不住唾弃,使用这么多环境变量是一个多么蠢的方法,希望未来Go能给大家带来更好的包管理方式吧,就像Java的maven那样。

参考文献:

  1. Go 1.13 Release Notes
  2. Go 1.12 Release Notes
  3. Go 1.11 Release Notes
  4. Go Modules 不完全教程

%0A的问题

在使用Go语言的net/url包进行编码组装url的时候,遇到如下报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2019-07-31 16:55:46.850 ERROR   executor/driver_rollback.go:41  encounter an error:bad response code: 404
github.com/glory-cd/agent/executor.(*HttpFileHandler).Get
/home/liupeng/cdp/src/agent/executor/file_http.go:115
github.com/glory-cd/agent/executor.(*Client).Get
/home/liupeng/cdp/src/agent/executor/client.go:66
github.com/glory-cd/agent/executor.Get
/home/liupeng/cdp/src/agent/executor/filehandler.go:33
github.com/glory-cd/agent/executor.(*Roll).getCode
/home/liupeng/cdp/src/agent/executor/driver_rollback.go:77
github.com/glory-cd/agent/executor.(*Roll).Exec
/home/liupeng/cdp/src/agent/executor/driver_rollback.go:52
runtime.goexit
/usr/lib/go/src/runtime/asm_amd64.s:1337
the kv is: {"url":"http://admin:xxxx@192.168.1.75:32749/test/1.0.0/Gateway.zip%0A"}

可见,最终组装成的url末尾多了%0A,从而导致http请求返回404,那么%0A是怎么来的呢? 那就回溯一下url的组装过程吧。

我用来组装url的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//创建url.URL
func (hu *HttpFileHandler) newPostUrl() string {
requestURL := new(url.URL)

requestURL.Scheme = "http"

requestURL.User = url.UserPassword(hu.client.User, hu.client.Pass)

requestURL.Host = hu.client.Addr

requestURL.Path += hu.client.RelativePath

return requestURL.String()
}

其中Path部分是使用hu.client.RelativePath进行拼接的,RelativePathient的来源如下:

1
2
3
4
5
6
7
8
9
func (d *driver) readServiceVerion() (string, error) {
versionFile := filepath.Join(d.Dir, common.PathFile)
path, err := ioutil.ReadFile(versionFile)
if err != nil {
return "", errors.WithStack(err)
}

return string(path), nil
}

发现RelativePathient是从文件中读取的,我这个文件中只有一行内容,那么再结合url encode来看,这个%0A就是一个linefeed,是一个换行符,那么处理方法就简单了,返回的时候Trim一下就可以解决换行和空格的问题。

1
return strings.TrimSpace(string(path)), nil

接下来再简单聊一下url encode

url encode


百分号编码(英语:Percent-encoding,又称:URL编码(英语:URL encoding)),是特定上下文的统一资源定位符 (URL)的编码机制. 实际上也适用于统一资源标志符(URI)的编码。也用于为application/x-www-form-urlencodedMIME准备数据,因为它用于通过HTTP的请求操作(request)提交HTML表单数据。


上面是维基百科对url编码的解释,通常如果一样东西需要编码,说明这样东西并不适合传输。原因多种多样,如Size过大,包含隐私数据,对于Url来说,之所以要进行编码,是因为Url中有些字符会引起歧义。

例如,Url参数字符串中使用key=value键值对这样的形式来传参,键值对之间以&符号分隔,如/s?q=abc& ie=utf-8。如果你的value字符串中包含了=或者&,那么势必会造成接收Url的服务器解析错误,因此必须将引起歧义的&和= 符号进行转义,也就是对其进行编码。

又如,Url的编码格式采用的是ASCII码,而不是Unicode,这也就是说你不能在Url中包含任何非ASCII字符,例如中文。否则如果客户端浏览器和服务端浏览器支持的字符集不同的情况下,中文可能会造成问题。

Url编码的原则就是使用安全的字符(没有特殊用途或者特殊意义的可打印字符)去表示那些不安全的字符。

语法构成

URI是统一资源标识的意思,通常我们所说的URL只是URI的一种。典型URL的格式如下所示。下面提到的URL编码,实际上应该指的是URI编码。

rfc3986中解释URI的构成如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
   The generic URI syntax consists of a hierarchical sequence of
components referred to as the scheme, authority, path, query, and
fragment.

URI = scheme ":" hier-part [ "?" query ] [ "#" fragment ]

hier-part = "//" authority path-abempty
/ path-absolute
/ path-rootless
/ path-empty

The following are two example URIs and their component parts:

foo://example.com:8042/over/there?name=ferret#nose
\_/ \______________/\_________/ \_________/ \__/
| | | | |
scheme authority path query fragment
| _____________________|__
/ \ / \
urn:example:animal:ferret:nose

哪些字符需要编码

RFC3986文档规定,Url中只允许包含英文字母(a-zA-Z)、数字(0-9)、-_.~4个特殊字符以及所有保留字符。 RFC3986文档对Url的编解码问题做出了详细的建议,指出了哪些字符需要被编码才不会引起Url语义的转变,以及对为什么这些字符需要编码做出了相 应的解释。

US-ASCII字符集中没有对应的可打印字符:Url中只允许使用可打印字符。US-ASCII码中的10-7F字节全都表示控制字符,这些 字符都不能直接出现在Url中。同时,对于80-FF字节(ISO-8859-1),由于已经超出了US-ACII定义的字节范围,因此也不可以放在 Url中。

保留字符:Url可以划分成若干个组件,协议、主机、路径等。有一些字符(:/?#[]@)是用作分隔不同组件的。例如:冒号用于分隔协议和主 机,/用于分隔主机和路径,?用于分隔路径和查询参数,等等。还有一些字符(!$&'()*+,;=)用于在每个组件中起到分隔作用的,如=用于 表示查询参数中的键值对,&符号用于分隔查询多个键值对。当组件中的普通数据包含这些特殊字符时,需要对其进行编码。

RFC3986中指定了以下字符为保留字符:! * ' ( ) ; : @ & = + $ , / ? # [ ]

不安全字符:还有一些字符,当他们直接放在Url中的时候,可能会引起解析程序的歧义。这些字符被视为不安全字符,原因有很多。

  • 空格:Url在传输的过程,或者用户在排版的过程,或者文本处理程序在处理Url的过程,都有可能引入无关紧要的空格,或者将那些有意义的空格给去掉。
  • 引号以及<>:引号和尖括号通常用于在普通文本中起到分隔Url的作用
  • :通常用于表示书签或者锚点

  • %:百分号本身用作对不安全字符进行编码时使用的特殊字符,因此本身需要编码
  • {}|\^[]`~:某一些网关或者传输代理会篡改这些字符

需要注意的是,对于Url中的合法字符,编码和不编码是等价的,但是对于上面提到的这些字符,如果不经过编码,那么它们有可能会造成Url语义 的不同。因此对于Url而言,只有普通英文字符和数字,特殊字符$-_.+!*'()还有保留字符,才能出现在未经编码的Url之中。其他字符均需要经过 编码之后才能出现在Url中。


如何对url进行编码

Url编码通常也被称为百分号编码(Url Encoding,also known as percent-encoding),是因为它的编码方式非常简单,使用%百分号加上两位的字符——0123456789ABCDEF——代表一个字节的 十六进制形式。Url编码默认使用的字符集是US-ASCII。例如a在US-ASCII码中对应的字节是0x61,那么Url编码之后得到的就 是%61,我们在地址栏上输入http://g.cn/search?q=%61%62%63,实际上就等同于在google上搜索abc了。又如@符号 在ASCII字符集中对应的字节为0x40,经过Url编码之后得到的是%40。

对于非ASCII字符,需要使用ASCII字符集的超集进行编码得到相应的字节,然后对每个字节执行百分号编码。对于Unicode字 符,RFC文档建议使用utf-8对其进行编码得到相应的字节,然后对每个字节执行百分号编码。如"中文"使用UTF-8字符集得到的字节为0xE4 0xB8 0xAD 0xE6 0x96 0x87,经过Url编码之后得到"%E4%B8%AD%E6%96%87"。

如果某个字节对应着ASCII字符集中的某个非保留字符,则此字节无需使用百分号表示。例如"Url编码",使用UTF-8编码得到的字节是 0x55 0x72 0x6C 0xE7 0xBC 0x96 0xE7 0xA0 0x81,由于前三个字节对应着ASCII中的非保留字符"Url",因此这三个字节可以用非保留字符"Url"表示。最终的Url编码可以简化 成"Url%E7%BC%96%E7%A0%81" ,当然,如果你用"%55%72%6C%E7%BC%96%E7%A0%81"也是可以的。

使用vim去除EOL

还可以使用vim去除文件末尾的换行

1
2
3
4
5
6
You can turn off the 'eol' option and turn on the 'binary' option to write
a file without the EOL at the end of the file:

:set binary
:set noeol
:w

什么是inode

理解inode,要从文件储存说起。

文件储存在硬盘上,硬盘的最小存储单位叫做"扇区"(Sector)。每个扇区储存512字节(相当于0.5KB)。

操作系统读取硬盘的时候,不会一个个扇区地读取,这样效率太低,而是一次性连续读取多个扇区,即一次性读取一个"块"(block)。这种由多个扇区组成的"块",是文件存取的最小单位。"块"的大小,最常见的是4KB,即连续八个 sector组成一个 block。

文件数据都储存在"块"中,那么很显然,我们还必须找到一个地方储存文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种储存文件元信息的区域就叫做inode,中文译名为"索引节点"。

每一个文件都有对应的inode,里面包含了与该文件有关的一些信息。

inode的内容

inode包含文件的元信息,具体来说有以下内容:

  • 文件的字节数
  • 文件拥有者的User ID
  • 文件的Group ID
  • 文件的读、写、执行权限
  • 文件的时间戳
  • 链接数,即有多少文件名指向这个inode
  • 文件数据block的位置

使用stat命令查看某个文件的inode信息

1
2
3
4
5
6
7
8
9
 $ stat proxy.sh 
File: proxy.sh
Size: 89 Blocks: 8 IO Block: 4096 regular file
Device: 802h/2050d Inode: 9714171 Links: 1
Access: (0755/-rwxr-xr-x) Uid: ( 1000/ liupeng) Gid: ( 1000/ liupeng)
Access: 2019-07-29 10:25:57.880429139 +0800
Modify: 2019-07-29 10:25:57.880429139 +0800
Change: 2019-07-29 10:26:05.910683677 +0800
Birth: 2019-07-29 10:25:57.880429139 +0800

inode的大小

inode也会消耗硬盘空间,所以硬盘格式化的时候,操作系统自动将硬盘分成两个区域。一个是数据区,存放文件数据;另一个是inode区(inode table),存放inode所包含的信息。

每个inode节点的大小,一般是128字节或256字节,甚至可以手动指定到2K,inode节点的总数,在格式化时就给定,以ext3/ext4为例:

  • 每个 inode 大小为 256byte,block 大小为 4k byte;
  • 根据 block count 和 inode count,我们也可以算出 16k bytes-per-inode(15728384*4096/3932160)

也就是文件系统在创建的时候每16k空间自动划分一个inode,如果你需要存储的是大量的小文件,那么你应该在格式化分区的时候手动修改bytes-per-inode的值,例如:

1
mkfs.ext4 -i 8192 /dev/sda1

而在xfs文件系统中-i inode_options中的maxpct=value描述如下:

This specifies the maximum percentage of space in the filesystem that can be allocated to inodes. The default value is 25% for filesystems under 1TB, 5% for filesystems under 50TB and 1% for filesystems over 50TB.

可见默认情况下xfs文件系统要比ext文件系统分配更多的inode

可以使用df -i查看inode的大小和使用率

1
2
3
4
5
6
7
8
9
10
$ df -i
Filesystem Inodes IUsed IFree IUse% Mounted on
dev 1015658 414 1015244 1% /dev
run 1017797 680 1017117 1% /run
/dev/sda2 14057472 1003259 13054213 8% /
tmpfs 1017797 129 1017668 1% /dev/shm
tmpfs 1017797 18 1017779 1% /sys/fs/cgroup
tmpfs 1017797 629 1017168 1% /tmp
/dev/sda1 0 0 0 - /boot/efi
tmpfs 1017797 37 1017760 1% /run/user/1000

inode号

每个inode都有一个号码,操作系统用inode号码来识别不同的文件。

这里值得重复一遍,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。对于系统来说,文件名只是inode号码便于识别的别称或者绰号。

表面上,用户通过文件名,打开文件。实际上,系统内部这个过程分成三步:首先,系统找到这个文件名对应的inode号码;其次,通过inode号码,获取inode信息;最后,根据inode信息,找到文件数据所在的block,读出数据。

使用ls -i命令,可以看到文件名对应的inode号码:

1
2
$ ls -i proxy.sh 
9714171 proxy.sh

目录文件

Unix/Linux系统中,目录(directory)也是一种文件。打开目录,实际上就是打开目录文件。

目录文件的结构非常简单,就是一系列目录项(dirent)的列表。每个目录项,由两部分组成:所包含文件的文件名,以及该文件名对应的inode号码。

查看目录的inode

通过介绍我们知道,通常情况下每个文件对应一个inode,那么如果想查找某个目录使用的inode数量,则可以使用如下命令:

1
clear;echo "Detailed Inode usage: $(pwd)" ; for d in `find -maxdepth 1 -type d |cut -d\/ -f2 |grep -xv . |sort`; do c=$(find $d |wc -l) ; printf "$c\t\t- $d\n" ; done ; printf "Total: \t\t$(find $(pwd) | wc -l)\n"

输出如下:

1
2
3
4
5
6
7
8
Detailed Inode usage: /home/liupeng/liupzmin.github.io
312 - .git
8049 - node_modules
294 - public
4 - scaffolds
45 - source
409 - themes
Total: 9120

要清理inode,只要找到包含大量文件的目录删除之即可。

参考文献:

  1. 理解inode
  2. How to find the INODE usage on Linux

Golang提供了几个包可以将文件压缩为不同的类型,这篇博客主要展示一下archive/zip这个包的用法,如何将文件或文件夹压缩为zip格式,以及如何进行解压缩。

Compressing

usage

1
2
zipit("/tmp/documents", "/tmp/backup.zip", "*.log")
zipit("/tmp/report.txt", "/tmp/report-2015.zip", "*.log")

func Zipit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//压缩为zip格式
//source为要压缩的文件或文件夹, 绝对路径和相对路径都可以
//target是目标文件
//filter是过滤正则(Golang 的 包 path.Match)
func Zipit(source, target, filter string) error {
var err error
if isAbs := filepath.IsAbs(source); !isAbs {
source, err = filepath.Abs(source) // 将传入路径直接转化为绝对路径
if err != nil {
return errors.WithStack(err)
}
}
//创建zip包文件
zipfile, err := os.Create(target)
if err != nil {
return errors.WithStack(err)
}

defer func() {
if err := zipfile.Close(); err != nil{
log.Slogger.Errorf("*File close error: %s, file: %s", err.Error(), zipfile.Name())
}
}()

//创建zip.Writer
zw := zip.NewWriter(zipfile)

defer func() {
if err := zw.Close(); err != nil{
log.Slogger.Errorf("zipwriter close error: %s", err.Error())
}
}()

info, err := os.Stat(source)
if err != nil {
return errors.WithStack(err)
}

var baseDir string
if info.IsDir() {
baseDir = filepath.Base(source)
}

err = filepath.Walk(source, func(path string, info os.FileInfo, err error) error {

if err != nil {
return errors.WithStack(err)
}

//将遍历到的路径与pattern进行匹配
ism, err := filepath.Match(filter, info.Name())

if err != nil {
return errors.WithStack(err)
}
//如果匹配就忽略
if ism {
return nil
}
//创建文件头
header, err := zip.FileInfoHeader(info)
if err != nil {
return errors.WithStack(err)
}

if baseDir != "" {
header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, source))
}

if info.IsDir() {
header.Name += "/"
} else {
header.Method = zip.Deflate
}
//写入文件头信息
writer, err := zw.CreateHeader(header)
if err != nil {
return errors.WithStack(err)
}

if info.IsDir() {
return nil
}
//写入文件内容
file, err := os.Open(path)
if err != nil {
return errors.WithStack(err)
}

defer func() {
if err := file.Close(); err != nil{
log.Slogger.Errorf("*File close error: %s, file: %s", err.Error(), file.Name())
}
}()
_, err = io.Copy(writer, file)

return errors.WithStack(err)
})

if err != nil {
return errors.WithStack(err)
}

return nil
}

Extracting

usage

1
unzip("/tmp/report-2015.zip", "/tmp/reports/")

func Unzip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//解压zip
func Unzip(archive, target string) error {
reader, err := zip.OpenReader(archive)
if err != nil {
return errors.WithStack(err)
}

if err := os.MkdirAll(target, 0755); err != nil {
return errors.WithStack(err)
}

for _, file := range reader.File {
unzippath := filepath.Join(target, file.Name)
if file.FileInfo().IsDir() {
err := os.MkdirAll(unzippath, file.Mode())
if err != nil {
return errors.WithStack(err)
}
continue
}

fileReader, err := file.Open()
if err != nil {
return errors.WithStack(err)
}
defer fileReader.Close()

targetFile, err := os.OpenFile(unzippath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, file.Mode())
if err != nil {
return errors.WithStack(err)
}
defer targetFile.Close()

if _, err := io.Copy(targetFile, fileReader); err != nil {
return errors.WithStack(err)
}
}

return nil
}

参考文献:

  1. Golang: Working with ZIP archives

Passing arguments to ... parameters

If f is variadic with a final parameter p of type ...T, then within f the type of p is equivalent to type []T. If f is invoked with no actual arguments for p, the value passed to p is nil. Otherwise, the value passed is a new slice of type []T with a new underlying array whose successive elements are the actual arguments, which all must be assignable to T. The length and capacity of the slice is therefore the number of arguments bound to p and may differ for each call site.

Given the function and calls

1
2
3
func Greeting(prefix string, who ...string)
Greeting("nobody")
Greeting("hello:", "Joe", "Anna", "Eileen")

within Greeting, who will have the value nil in the first call, and []string{"Joe", "Anna", "Eileen"} in the second.

If the final argument is assignable to a slice type []T, it may be passed unchanged as the value for a ...T parameter if the argument is followed by .... In this case no new slice is created.

Given the slice s and call

1
2
s := []string{"James", "Jasmine"}
Greeting("goodbye:", s...)

within Greeting, who will have the same value as s with the same underlying array.

上面是官方文档中关于...的用法的描述,主要有两种使用形式:

参数列表

如果最后一个函数参数的类型的是...T,那么在调用这个函数的时候,我们可以在参数列表的最后使用若干个类型为T的参数。这里,...T在函数内部的类型实际是[]T,是隐式创建的slice:

1
2
3
4
5
6
7
8
9
func Sum(nums ...int) int {
res := 0
for _, n := range nums {
res += n
}
return res
}

Sum(1,2,3)

slice参数

如果传入的最后一个函数参数是slice []T,直接在slice后跟...,那么就不会创建新的slice:

1
2
primes := []int{2, 3, 5, 7}
fmt.Println(Sum(primes...)) // 17

因此,当我们想把一个slice追加到另外一个slice时,可以使用如下方式:

1
2
3
4
s1 := []int{0, 1, 2, 3}
s2 := []int{4, 5, 6, 7}
s1 = append(s1, s2...) // instead of FOR
fmt.Println(s1)

或者,当我们想在slice中删除一个元素时:

1
2
3
4
5
6
s1 := []int{0, 1, 2, 3}
fo index, v := range s1{
if index == 2{
s1 = append(s1[:index], s1[index+1:]...)
}
}

标识数组元素个数

默认情况下,数组的每个元素都被初始化为元素类型对应的零值,对于数字类型来说就是0。我们也可以使用数组字面值语法用一组值来初始化数组:

1
2
3
var q [3]int = [3]int{1, 2, 3}
var r [3]int = [3]int{1, 2}
fmt.Println(r[2]) // "0"

在数组字面值中,如果在数组的长度位置出现的是“...”省略号,则表示数组的长度是根据初始化值的个数来计算。因此,上面q数组的定义可以简化为

1
2
q := [...]int{1, 2, 3}
fmt.Printf("%T\n", q) // "[3]int"

Go命令行中的通配符

描述包文件的通配符。
在这个例子中,会单元测试当前目录和所有子目录的所有包:

1
go test ./...

参考文献:

  1. golang 三个点(three dots)的用法
  2. Passing arguments to ... parameters

Cannot run program "python3": error=2, 没有那个文件或目录

spark2中默认使用的是python2,可以通过以下三种方式之一使用python3:

  1. PYSPARK_PYTHON=python3 pyspark2
  2. 修改~/.bash_profile,增加 PYSPARK_PYTHON=python3
  3. 修改spark-env.sh增加PYSPARK_PYTHON=/usr/local/bin/python3

如果使用前2种不带绝对路径的变量声明时可能会遇到Cannot run program "python3": error=2, 没有那个文件或目录错误,原因是我的spark环境默认的是运行在yarn上的,当执行RDD任务时会在其他节点报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[root@hadoop-04 ~]# PYSPARK_PYTHON=python3 pyspark2
Python 3.6.4 (default, Mar 21 2018, 13:55:56)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0.cloudera2
/_/

Using Python version 3.6.4 (default, Mar 21 2018 13:55:56)
SparkSession available as 'spark'.
>>> lines = sc.textFile('/afis/flume/auth/2018/03/16/auth.1521129675887.log')
>>> pythonlines = lines.filter(lambda line:"python" in line)
>>> pythonlines.count()
[Stage 0:> (0 + 2) / 2]18/03/22 13:25:22 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop-02, executor 2): java.io.IOException: Cannot run program "python3": error=2, 没有那个文件或目录
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163)
at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:89)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:128)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, 没有那个文件或目录
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 14 more
......
>>> 18/03/22 13:25:23 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, hadoop-03, executor 1): TaskKilled (stage cancelled)

hadoop-02这个节点上找不到python3导致任务终止,既然提示在其他节点上找不到,那在本地节点运行会是哪种结果呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@hadoop-04 ~]# PYSPARK_PYTHON=python3 pyspark2 --master local
Python 3.6.4 (default, Mar 21 2018, 13:55:56)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0.cloudera2
/_/

Using Python version 3.6.4 (default, Mar 21 2018 13:55:56)
SparkSession available as 'spark'.
>>> lines = sc.textFile('/afis/flume/auth/2018/03/16/auth.1521129675887.log')
>>> pythonlines = lines.filter(lambda line:"python" in line)
>>> pythonlines.count()
0

可见在本地运行是没有问题的,那问题就出在python3的可执行文件少了绝对路径,猜测是spark内部的任务调度执行的时候没有使用操作系统的PATH导致找不到可执行文件,现在把python3的可执行文件路径补全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@hadoop-04 ~]# PYSPARK_PYTHON=/usr/local/bin/python3 pyspark2
Python 3.6.4 (default, Mar 21 2018, 13:55:56)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/03/22 14:54:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.2.0.cloudera2
/_/

Using Python version 3.6.4 (default, Mar 21 2018 13:55:56)
SparkSession available as 'spark'.
>>> lines = sc.textFile('/afis/flume/auth/2018/03/16/auth.1521129675887.log')
>>> pythonlines = lines.filter(lambda line:"python" in line)
>>> pythonlines.count()
0
>>> pythonlines = lines.filter(lambda line:"SessionTask" in line)
>>> pythonlines.count()
719

可见,要在spark2上使用python3需要设置PYSPARK_PYTHON为可执行文件的绝对路径,优先推荐设置spark-env.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@hadoop-01 ~]# more /etc/spark2/conf/spark-env.sh 
#!/usr/bin/env bash
##
# Generated by Cloudera Manager and should not be modified directly
##

SELF="$(cd $(dirname $BASH_SOURCE) && pwd)"
if [ -z "$SPARK_CONF_DIR" ]; then
export SPARK_CONF_DIR="$SELF"
fi

export SPARK_HOME=/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2
export DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.14.0-1.cdh5.14.0.p0.24/lib/hadoop
export PYSPARK_PYTHON=/usr/local/bin/python3

1. 几种方式

目前要把kafka中的数据传输到elasticsearch集群大概有一下几种方法:

  • logstash

  • flume

  • spark streaming

  • kafka connect

  • 自己开发程序读取kafka写入elastic

其中logstash看到网上说不太稳定,且我目前用过版本2.3 ,确实经常出现crash的情况,所以一开始并未考虑;首先尝试的是通过flume到es,因为目前kafka到HDFS中间用的是flume,想再加一个通道和sink到es,而且flume也有es的sink,但是我的flume是最新版1.8,elasticsearch也是最新版6.2.2,中间碰到了兼容性问题,未能成功;转而去研究kafka connect,按照《kafka权威指南》上的例子研究了一下,同样遇到兼容性问题,在我的版本组合中无法奏效,但我不想去修改已经安装好的flume或者es集群,spark streaming过于复杂,自己开发程序成本过高、且周期较长;最终去尝试logstash,结果配置非常容易,简单奏效,稳定性问题暂时无法看出,留待日后详测,现记录一下配置。

2. logstash配置

2.1 kafka input plugin

1
2
3
4
5
6
7
8
9
10
11
12
input{
kafka{
bootstrap_servers => ["192.168.1.120:9092,192.168.1.121:9092,192.168.1.122:9092"]
client_id => "test"
group_id => "logstash-es"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => "true"
topics => ["auth","base","clearing","trademgt"]
type => "kafka-to-elas"
}
}

其中配置项decorate_events 颇为有用,如果只用了单个logstash,订阅了多个主题,你肯定希望在es中为不同主题创建不同的索引,那么decorate_events 就是你想要的,看官方解释:

decorate_events

  • Value type is boolean
  • Default value is false

Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key

大意是指定这个选项为true时,会附加kafka的一些信息到logstash event的一个名为kafka的域中,例如topic、消息大小、偏移量、consumer_group等,具体如下:

Metadata fields

The following metadata from Kafka broker are added under the [@metadata] field:

  • [@metadata][kafka][topic]: Original Kafka topic from where the message was consumed.
  • [@metadata][kafka][consumer_group]: Consumer group
  • [@metadata][kafka][partition]: Partition info for this message.
  • [@metadata][kafka][offset]: Original record offset for this message.
  • [@metadata][kafka][key]: Record key, if any.
  • [@metadata][kafka][timestamp]: Timestamp when this message was received by the Kafka broker.

Please note that @metadata fields are not part of any of your events at output time. If you need these information to be inserted into your original event, you’ll have to use the mutate filter to manually copy the required fields into your event.

值得注意的是,在output的时候这些域的元数据信息并不是event的一部分,如果希望这些元数据插入到原生的event中,就需要利用mutate手动copy进event,我们接下来会在filter中利用kafka域的内容构建自定义的域。

2.2 构建[@metadata][index]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
filter {
if [@metadata][kafka][topic] == "auth" {
mutate {
add_field => {"[@metadata][index]" => "auth-%{+YYYY.MM.dd}"}
}
} else if [@metadata][kafka][topic] == "base" {
mutate {
add_field => {"[@metadata][index]" => "base-%{+YYYY.MM.dd}"}
}

}else if [@metadata][kafka][topic] == "clearing" {

mutate {
add_field => {"[@metadata][index]" => "clearing-%{+YYYY.MM.dd}"}
}
}else{
mutate {
add_field => {"[@metadata][index]" => "trademgt-%{+YYYY.MM.dd}"}
}
}
# remove the field containing the decorations, unless you want them to land into ES
mutate {
remove_field => ["kafka"]
}
}

为每个主题构建对应的[@metadata][index],并在接下来output中引用

2.3 elasticsearch output plugin

1
2
3
4
5
6
7
8
output{
elasticsearch{
hosts => ["192.168.1.123:9200","192.168.1.122:9200","192.168.1.121:9200"]
index => "%{[@metadata][index]}"
timeout => 300
}

}

启动logstash就可以在kibana中观察到数据了。

3. 特殊的metadata

The @metadata field

In Logstash 1.5 and later, there is a special field called @metadata. The contents of @metadata will not be part of any of your events at output time, which makes it great to use for conditionals, or extending and building event fields with field reference and sprintf formatting.

The following configuration file will yield events from STDIN. Whatever is typed will become the message field in the event. The mutate events in the filter block will add a few fields, some nested in the @metadata field.

1
2
3
4
5
6
7
8
9
10
11
12
13
input { stdin { } }

filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}

Let’s see what comes out:

1
2
3
4
5
6
7
8
9
10
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:42:51.496Z,
"@version" => "1",
"host" => "example.com",
"show" => "This data will be in the output",
"message" => "asdf"
}

The "asdf" typed in became the message field contents, and the conditional successfully evaluated the contents of the test field nested within the @metadata field. But the output did not show a field called @metadata, or its contents.

The rubydebug codec allows you to reveal the contents of the @metadata field if you add a config flag, metadata => true:

1
stdout { codec => rubydebug { metadata => true } }

Let’s see what the output looks like with this change:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:46:48.565Z,
"@metadata" => {
"test" => "Hello",
"no_show" => "This data will not be in the output"
},
"@version" => "1",
"host" => "example.com",
"show" => "This data will be in the output",
"message" => "asdf"
}

Now you can see the @metadata field and its sub-fields.

Important

Only the rubydebug codec allows you to show the contents of the @metadata field.

Make use of the @metadata field any time you need a temporary field but do not want it to be in the final output.

Perhaps one of the most common use cases for this new field is with the date filter and having a temporary timestamp.

This configuration file has been simplified, but uses the timestamp format common to Apache and Nginx web servers. In the past, you’d have to delete the timestamp field yourself, after using it to overwrite the @timestamp field. With the @metadata field, this is no longer necessary:

1
2
3
4
5
6
7
8
9
10
input { stdin { } }

filter {
grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
stdout { codec => rubydebug }
}

Notice that this configuration puts the extracted date into the [@metadata][timestamp] field in the grok filter. Let’s feed this configuration a sample date string and see what comes out:

1
2
3
4
5
6
7
8
9
$ bin/logstash -f ../test.conf
Pipeline main started
02/Mar/2014:15:36:43 +0100
{
"@timestamp" => 2014-03-02T14:36:43.000Z,
"@version" => "1",
"host" => "example.com",
"message" => "02/Mar/2014:15:36:43 +0100"
}

That’s it! No extra fields in the output, and a cleaner config file because you do not have to delete a "timestamp" field after conversion in the date filter.

Another use case is the CouchDB Changes input plugin (See https://github.com/logstash-plugins/logstash-input-couchdb_changes). This plugin automatically captures CouchDB document field metadata into the @metadata field within the input plugin itself. When the events pass through to be indexed by Elasticsearch, the Elasticsearch output plugin allows you to specify the action(delete, update, insert, etc.) and the document_id, like this:

1
2
3
4
5
6
7
8
9
output {
elasticsearch {
action => "%{[@metadata][action]}"
document_id => "%{[@metadata][_id]}"
hosts => ["example.com"]
index => "index_name"
protocol => "http"
}
}

利用DBMS_SCHEDULER在Oracle 11gR2 RAC上执行rman备份

RMAN backup in Oracle 11gR2 RAC is exactly same like RMAN backup in Oracle 11gR2 single node.
The only difference is: Typically, in case of Oracle single node database, we will schedule RMAN scripts with the help of CRON job and it will run according to our convenience, but in case of Oracle RAC if we schedule RMAN script and if unfortunately that RAC node goes down ( where we configured RMAN scripts ), then RMAN backup won’t run obviously.

So, Same strategy will not be work in Oracle RAC node. For RMAN consistent backups use dbms_scheduler & we need to place RMAN scripts in shared directory. ( Or in my case, I have created identical scripts on both cluster node’s )

注意: 需要将脚本放在共享位置或者每个节点的相同位置

看一下rman的备份脚本,此脚本将备份放在ASM中,将日志放在节点本地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
################################################################
# rman_backup_rac.sh FOR RAC #
# created by lp #
# 2017/03/22 #
# usage: rman_backup_rac.sh <$BACKUP_LEVEL> #
# BACKUP_LEVEL: #
# F: full backup #
# 0: level 0 #
# 1: level 1 #
################################################################


#!/bin/bash
# User specific environment and startup programs
export ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1
export RMAN_BAK_LOG_BASE=/home/oracle/DbBackup
export RMAN_BAK_DATA_BASE=+data/NXRAC/backup
export ORACLE_SID=`ps -ef|grep pmon|grep -v ASM|grep -v grep|awk -F'_' '{print $NF}'`
export TIMESTAMP=`date +%Y%m%d%H%M`;

#the destination of rman backuppiece
export RMAN_DATA=${RMAN_BAK_DATA_BASE}/rman

#the destination of rman backup logs
export RMAN_LOG=${RMAN_BAK_LOG_BASE}/logs


if [[ ! -z $1 ]] && echo $1 |grep -Ew "[01F]" >/dev/null 2>&1
then
export RMAN_LEVEL=${1}

# Check rman level
if [ "$RMAN_LEVEL" == "F" ];
then unset INCR_LVL
BACKUP_TYPE=full
else
INCR_LVL="INCREMENTAL LEVEL ${RMAN_LEVEL}"
BACKUP_TYPE=lev${RMAN_LEVEL}
fi
else
echo "${1} wrong argument!" >${RMAN_LOG}/wrong_argument_${TIMESTAMP}.log
exit 1
fi





#the prefix of rman backuppiece
export RMAN_FILE=${RMAN_DATA}/${BACKUP_TYPE}_${TIMESTAMP}

#the logfile of shell script including the rman logs contents
export SSH_LOG=${RMAN_LOG}/${BACKUP_TYPE}_${TIMESTAMP}.log

#the size of backuppiece
export MAXPIECESIZE=4G

####################################################################
# #
# the name of rman logs excluding the file expanded-name, #
# when the shell is complete,the content of this file will be #
# appended to the $SSH_LOG and the rman logfile will be deleted. #
# #
####################################################################
export RMAN_LOG_FILE=${RMAN_LOG}/${BACKUP_TYPE}_${TIMESTAMP}_1


#Check RMAN Backup Path

if ! test -d ${RMAN_LOG}
then
mkdir -p ${RMAN_LOG}
fi

echo "---------------------------------" >>${SSH_LOG}
echo " " >>${SSH_LOG}
echo "Rman Begin to Working ........." >>${SSH_LOG}
echo "Begin time at:" `date` --`date +%Y%m%d%H%M` >>${SSH_LOG}

#Startup rman to backup

$ORACLE_HOME/bin/rman log=${RMAN_LOG_FILE}.log <<EOF
connect target /
run {
CONFIGURE RETENTION POLICY TO RECOVERY WINDOW OF 7 DAYS;
CONFIGURE BACKUP OPTIMIZATION ON;
CONFIGURE CONTROLFILE AUTOBACKUP ON;
CONFIGURE DEVICE TYPE DISK PARALLELISM 4 BACKUP TYPE TO BACKUPSET;
CONFIGURE CONTROLFILE AUTOBACKUP FORMAT FOR DEVICE TYPE DISK TO '${RMAN_DATA}/control_auto_%F';
ALLOCATE CHANNEL 'ch1' TYPE DISK maxpiecesize=${MAXPIECESIZE} CONNECT 'SYS/passwd@node1';
ALLOCATE CHANNEL 'ch2' TYPE DISK maxpiecesize=${MAXPIECESIZE} CONNECT 'SYS/passwd@node1';
ALLOCATE CHANNEL 'ch3' TYPE DISK maxpiecesize=${MAXPIECESIZE} CONNECT 'SYS/passwd@mode2';
ALLOCATE CHANNEL 'ch4' TYPE DISK maxpiecesize=${MAXPIECESIZE} CONNECT 'SYS/passwd@node2';
CROSSCHECK ARCHIVELOG ALL;
DELETE NOPROMPT OBSOLETE;
DELETE NOPROMPT EXPIRED BACKUP;
BACKUP AS COMPRESSED BACKUPSET
${INCR_LVL}
DATABASE FORMAT '${RMAN_FILE}_db_%U' TAG '${BACKUP_TYPE}_${TIMESTAMP}';
SQL 'ALTER SYSTEM ARCHIVE LOG CURRENT';
BACKUP FILESPERSET 20 ARCHIVELOG ALL FORMAT '${RMAN_FILE}_arc_%U' TAG '${ORACLE_SID}_arc_${TIMESTAMP}'
DELETE INPUT;
RELEASE CHANNEL ch1;
RELEASE CHANNEL ch2;
RELEASE CHANNEL ch3;
RELEASE CHANNEL ch4;
ALLOCATE CHANNEL ch00 TYPE DISK;
BACKUP
FORMAT '${RMAN_DATA}/cntrl_%U'
CURRENT CONTROLFILE;
RELEASE CHANNEL ch00;
}
exit;
EOF

RC=$?

cat ${RMAN_LOG_FILE}.log >>${SSH_LOG}
echo "Rman Stop working @ time:"`date` `date +%Y%m%d%H%M` >>${SSH_LOG}

if [ $RC -ne "0" ]; then
echo "------ error ------" >>${SSH_LOG}
else
echo "------ Success during RMAN backup peroid------" >>${SSH_LOG}
rm -rf ${RMAN_LOG_FILE}.log
fi

exit

DBMS_SCHEDULER:

Here we are using DBMS_SCHEDULER instead of DBMS_JOB, because DBMS_SCHEDULER is RAC aware.

Before jump into real DBMS_SCHEDULER configuration, we need to focus on an important thing, That:

Both RAC nodes local time zone must be identical with DBMS_SCHEDULER default time.

On all RAC node, Ensure local time zone and set it accordingly.

1
2
[oracle@node2 ]$ cat /etc/sysconfig/clock
ZONE="Asia/Shanghai"

configure default time zone for DBMS_SCHEDULER

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SQL> select value from dba_scheduler_global_attribute where attribute_name = 'DEFAULT_TIMEZONE';

VALUE
--------------------------------------------------------------------------------
PRC

SQL> exec dbms_scheduler.set_scheduler_attribute ('DEFAULT_TIMEZONE', 'Asia/Shanghai');

PL/SQL procedure successfully completed.

SQL> select value from dba_scheduler_global_attribute where attribute_name = 'DEFAULT_TIMEZONE';

VALUE
--------------------------------------------------------------------------------
Asia/Shanghai

Now we need to create credential so that are assigned to DBMS_SCHEDULER jobs so that they can authenticate with a local/remote host operating system or a remote Oracle database.

1
2
3
SQL> exec dbms_scheduler.create_credential(credential_name => 'oracle', username => 'oracle', password => 'oracle');

PL/SQL procedure successfully completed.

Now its time to create DBMS_SCHEDULER job for RMAN incremental level 0 backup, Here in this procedure I am going to create RMAN_INC0_BACKUP job with required attributes.

1
2
3
4
5
6
7
8
9
10
11
12
begin
dbms_scheduler.create_job(
job_name => 'RMAN_INC0_BACKUP',
job_type => 'EXECUTABLE',
job_action => '/bin/sh',
number_of_arguments => 2,
start_date => SYSTIMESTAMP,
credential_name => 'oracle',
auto_drop => FALSE,
enabled => FALSE);
end;
/

Set argument_position & argument_value ( i.e. Path of the RMAN script ) for the same job:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
begin
dbms_scheduler.set_job_argument_value(
job_name => 'RMAN_INC0_BACKUP',
argument_position => 1,
argument_value => '/home/oracle/rman.sh');
end;
/

begin
dbms_scheduler.set_job_argument_value(
job_name => 'RMAN_INC0_BACKUP',
argument_position => 2,
argument_value => 0);
end;
/

Set start_date for the same job, In my case RMAN_INC0_BACKUP job will execute every week on sunday @03am, so job start date and its first run timing would according to my convenience.

1
2
3
4
5
6
7
begin
dbms_scheduler.set_attribute(
name => 'RMAN_INC0_BACKUP',
attribute => 'start_date',
value => trunc(sysdate)+3/24);
end;
/

Test your backup job manually in SQL prompt by instantiating RMAN_INC0_BACKUP job.

1
2
SQL> exec dbms_scheduler.run_job('RMAN_INC0_BACKUP');
PL/SQL procedure successfully completed.

Verify running RMAN backup status by issuing following SQL query, It will show you RMAN backup details with start time & end time.

1
2
3
4
5
6
select SESSION_KEY, INPUT_TYPE, STATUS,
to_char(START_TIME,'mm/dd/yy hh24:mi') start_time,
to_char(END_TIME,'mm/dd/yy hh24:mi') end_time,
elapsed_seconds/3600 hrs
from V$RMAN_BACKUP_JOB_DETAILS
order by session_key;

In case of any error while test run, you can make sure details of error by issuing the following query, OR You can also query to dba_scheduler_job_run_details dictionary view for more details.

1
select JOB_NAME,STATUS,STATE,ERROR#,CREDENTIAL_NAME from dba_scheduler_job_run_details where CREDENTIAL_NAME like 'RMAN%';

After successfully completion of test run, Enable & schedule it by following procedure by setting value to repeat_interval parameter, In my case RMAN_INC0_BACKUP job will execute every week on Sunday @03pm.

1
2
3
4
5
6
7
8
begin
dbms_scheduler.set_attribute(
name => 'RMAN_INC0_BACKUP',
attribute => 'repeat_interval',
value => 'freq=daily;byday=sun;byhour=03');
dbms_scheduler.enable( 'RMAN_INC0_BACKUP' );
end;
/

Ensure dbms_scheduler job details by issuing the following query OR you can also query to dba_scheduler_jobs and dba_scheduler_job_args.

1
SQL> select job_name,enabled,owner, state from dba_scheduler_jobs where job_name in ('RMAN_INC0_BACKUP');

Keep your eye on behavior of dbms_scheduler job by issuing the following query:

1
2
SQL> select job_name,RUN_COUNT,LAST_START_DATE,NEXT_RUN_DATE from dba_scheduler_jobs where job_name in ('RMAN_INC0_BACKUP');
SQL> select * from dba_scheduler_job_args where job_name like 'RMAN%';

In accordance with the above method to create a level 1 backup job RMAN_INC1_BACKUP,The only difference is the repeat_interval

1
2
3
4
5
6
7
8
begin
dbms_scheduler.set_attribute(
name => 'RMAN_INC1_BACKUP',
attribute => 'repeat_interval',
value => 'freq=daily;byday=mon,tue,wed,thu,fri,sat;byhour=03');
dbms_scheduler.enable( 'RMAN_INC1_BACKUP' );
end;
/

Important Note:
DBMS_SCHEDULER is smart enough to start backup on the node where the last backup was successfully executed.

参考:

https://dbatricksworld.com/how-to-backup-oracle-rac-11gr2-database-with-rman-backup-utility-with-the-help-of-dbms_scheduler-part-i-rman-full-database-backup/

http://dbatricksworld.com/how-to-backup-oracle-rac-11gr2-database-with-rman-backup-utility-with-the-help-of-dbms_scheduler-part-ii-rman-incremental-database-backup/

0%