// /examples/workqueue/main.go
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// Let the workers stop when we are done
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
// 启用 informer 服务
go c.informer.Run(stopCh)
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 业务逻辑回调 c.runWorker
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
// /tools/cache/reflector.go
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
// 启用 ListAndWatch
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
// /tools/cache/delta_fifo.go
// Pop returns a 'Deltas', which has a complete list of all the things
// that happened to the object (deltas) while it was sitting in the queue.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// 阻塞方式获取一个对象
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, FIFOClosedError
}
f.cond.Wait()
}
// f.queue 是一个slice, 这里获取首个元素后并更新这个切片
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 将id作为key从items这个 map 中获取 Deltas 信息
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// 这里 process 是下一步的进入
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
// /examples/workqueue/main.go
func main() {
...
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
...
}
// /examples/workqueue/main.go
func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
// 对应 8 步骤,从wprkqueue 里读取一个 key
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)
// Invoke the method containing the business logic
err := c.syncToStdout(key.(string))
// 出错,重试 5 次
// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key)
return true
}
// /examples/workqueue/main.go
func (c *Controller) syncToStdout(key string) error {
// 对应步骤 9,从 index 里读取对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
// Below we will warm up our cache with a Pod, so that we will see a delete for one pod
fmt.Printf("Pod %s does not exist anymoren", key)
} else {
// Note that you also have to check the uid if you have a local controlled resource, which
// is dependent on the actual instance, to detect that a Pod was recreated with the same name
fmt.Printf("Sync/Add/Update for Pod %sn", obj.(*v1.Pod).GetName())
}
return nil
}
// handleErr checks if an error happened and makes sure we will retry later.
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
type Dialer struct { // If Control is not nil, it is called after creating the network // connection but before actually dialing. // // Network and address parameters passed to Control method are not // necessarily the ones passed to Dial. For example, passing "tcp" to Dial // will cause the Control function to be called with "tcp4" or "tcp6". Control func(network, address string, c syscall.RawConn) error }
❯ asciinema -h usage: asciinema [-h] [--version] {rec,play,cat,upload,auth} ... Record and share your terminal sessions, the right way. positional arguments: {rec,play,cat,upload,auth} rec Record terminal session play Replay terminal session cat Print full output of terminal session upload Upload locally saved terminal session to asciinema.org auth Manage recordings on asciinema.org account optional arguments: -h, --help show this help message and exit --version show program's version number and exit example usage: Record terminal and upload it to asciinema.org: asciinema rec Record terminal to local file: asciinema rec demo.cast Record terminal and upload it to asciinema.org, specifying title: asciinema rec -t "My git tutorial" Record terminal to local file, limiting idle time to max 2.5 sec: asciinema rec -i 2.5 demo.cast Replay terminal recording from local file: asciinema play demo.cast Replay terminal recording hosted on asciinema.org: asciinema play https://asciinema.org/a/difqlgx86ym6emrmd8u62yqu8 Print full output of recorded session: asciinema cat demo.cast For help on a specific command run: asciinema <command> -h
录屏
录屏命令
asciinema rec
此时生成的文件将保存到临时目录里,一般为 /tmp/目录
也可以指定文件名
asciinema rec demo.cast
当看到以下信息表示录屏工作开始,以后的操作将会被记录下来
asciinema: recording asciicast to /tmp/tmpg4auzrud-ascii.cast asciinema: press <ctrl-d> or type "exit" when you're done
此时你可以进行正常的操作。
当操作完成后,按 ctrol-d 或 exit退出录屏,看到提示信息
asciinema: recording finished asciinema: press <enter> to upload to asciinema.org, <ctrl-c> to save locally asciinema: asciicast saved to /tmp/tmp1tj9jqnx-ascii.cast
asciinema upload demo.cast
View the recording at:
https://asciinema.org/a/KG2utenPw4pXk12TcEprPDaRh
This installation of asciinema recorder hasn't been linked to any asciinema.org
account. All unclaimed recordings (from unknown installations like this one)
are automatically archived 7 days after upload.
If you want to preserve all recordings made on this machine, connect this
installation with asciinema.org account by opening the following link:
https://asciinema.org/connect/4fc6bdf3-ecc4-445a-a045-540aa101dee1
我们先介绍一下在 Linux 中是如何对 TUN/TAP 虚拟设备进行操作创建管理和删除的,但对于如何充分利用这些设备必须通过编写程序代码来实现,在后面会给出网友整理出来的演示代码。
命令行输入 ip help 查看 ip 命令是否支持 tun/tap 工具,支持的话就会显示 tuntap 选项:
# ip help
Usage: ip [ OPTIONS ] OBJECT { COMMAND | help }
ip [ -force ] -batch filename
where OBJECT := { link | addr | addrlabel | route | rule | neigh | ntable |
tunnel | tuntap | maddr | mroute | mrule | monitor | xfrm |
netns | l2tp | tcp_metrics | token }
不支持就请升级或下载最新的 iproute2 工具包,也可以使用类似的 tunctl 工具。
先查看一下 ip tuntap 的基本用法
# ip tuntap help
Usage: ip tuntap { add | del | show | list | lst | help } [ dev PHYS_DEV ]
[ mode { tun | tap } ] [ user USER ] [ group GROUP ]
[ one_queue ] [ pi ] [ vnet_hdr ] [ multi_queue ] [ name NAME ]
Where: USER := { STRING | NUMBER }
GROUP := { STRING | NUMBER }
创建 tap/tun 设备:
# ip tuntap add dev tap0 mod tap # 创建 tap
# ip tuntap add dev tun0 mod tun # 创建 tun
# ifconfig -a
新添加的虚拟网卡默认是 DOWN 状态. 对于 tun 类型的虚拟网卡,它的MAC地址全是 0,这个是正常的
激活虚拟网卡
# ip link set tun0 up
# ip link set tap0 up
对它的操作与普通网卡的命令是一样的
分配IP
# ip addr add 10.0.0.1/24 dev tun0
此时 PING 10.0.0.1 是可以通的。
# ping 10.0.0.1
PING 10.0.0.1 (10.0.0.1) 56(84) bytes of data.
64 bytes from 10.0.0.1: icmp_seq=1 ttl=64 time=0.031 ms
64 bytes from 10.0.0.1: icmp_seq=2 ttl=64 time=0.036 ms
64 bytes from 10.0.0.1: icmp_seq=3 ttl=64 time=0.031 ms
64 bytes from 10.0.0.1: icmp_seq=4 ttl=64 time=0.037 ms
删除 tap/tun 设备:
# ip tuntap del dev tap0 mod tap # 删除 tap
# ip tuntap del dev tun0 mod tun # 删除 tun
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func sendTime(c interface{}, seq uintptr) {
// Non-blocking send of time on c.
// Used in NewTimer, it cannot block anyway (buffer).
// Used in NewTicker, dropping sends on the floor is
// the desired behavior when the reader gets behind,
// because the sends are periodic.
select {
case c.(chan Time) <- Now():
default:
}
}
type Timer struct { C <-chan Time r runtimeTimer }
一共两个字段,为了理解方面我们称 runtimeTimer 为 timer 值。
我们再看一下其中的 runtimeTimer 结构体的声明
// Interface to timers implemented in package runtime. // Must be in sync with ../runtime/time.go:/^type timer type runtimeTimer struct { pp uintptr when int64 period int64 f func(interface{}, uintptr) // NOTE: must not be closure arg interface{} seq uintptr nextwhen int64 status uint32 }