def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
python类write()的实例源码
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def __mod__(self, fields):
"""Interpolation into ``UrlEncoded``s is disabled.
If you try to write ``UrlEncoded("%s") % "abc", will get a
``TypeError``.
"""
raise TypeError("Cannot interpolate into a UrlEncoded object.")
def connect(self):
"""Returns an open connection (socket) to the Splunk instance.
This method is used for writing bulk events to an index or similar tasks
where the overhead of opening a connection multiple times would be
prohibitive.
:returns: A socket.
**Example**::
import splunklib.binding as binding
c = binding.connect(...)
socket = c.connect()
socket.write("POST %s HTTP/1.1\\r\\n" % "some/path/to/post/to")
socket.write("Host: %s:%s\\r\\n" % (c.host, c.port))
socket.write("Accept-Encoding: identity\\r\\n")
socket.write("Authorization: %s\\r\\n" % c.token)
socket.write("X-Splunk-Input-Mode: Streaming\\r\\n")
socket.write("\\r\\n")
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if self.scheme == "https":
sock = ssl.wrap_socket(sock)
sock.connect((socket.gethostbyname(self.host), self.port))
return sock
def _write_int(value, stream):
stream.write(struct.pack("!i", value))
def main(socket):
while (True):
input_length = _read_int(socket)
data = socket.read(input_length)
result = transform(data)
resultBytes = result.encode()
_write_int(len(resultBytes), socket)
socket.write(resultBytes)
socket.flush()